You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/05/19 23:16:58 UTC

reef git commit: [REEF-1393] Consider active threads when checking applicationDispatcher.isEmpty

Repository: reef
Updated Branches:
  refs/heads/master 326eae21a -> 2b032719f


[REEF-1393] Consider active threads when checking applicationDispatcher.isEmpty

This addressed the issue by
  * Adding active threads to consideration in applicationDispatcher.isEmpty.
  * Adding a ThreadPool to check for the exit of Evaluator handlers and to
    trigger a check for the idleness of the Evaluator.

JIRA:
  [REEF-1393](https://issues.apache.org/jira/browse/REEF-1393)

Pull Request:
  This closes #1007


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/2b032719
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/2b032719
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/2b032719

Branch: refs/heads/master
Commit: 2b032719f0c28fafd83d225b60b1f9ff89e9c6ec
Parents: 326eae2
Author: Andrew Chung <af...@gmail.com>
Authored: Wed May 18 11:38:42 2016 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Thu May 19 16:15:03 2016 -0700

----------------------------------------------------------------------
 .../apache/reef/client/DriverConfiguration.java | 18 +++++
 .../runtime/common/driver/DriverSingletons.java |  5 +-
 .../evaluator/EvaluatorIdlenessThreadPool.java  | 80 ++++++++++++++++++++
 .../driver/evaluator/EvaluatorManager.java      | 19 +++--
 .../EvaluatorIdlenessThreadPoolSize.java        | 33 ++++++++
 .../EvaluatorIdlenessWaitInMilliseconds.java    | 33 ++++++++
 .../runtime/common/utils/DispatchingEStage.java |  2 +-
 .../apache/reef/wake/impl/ThreadPoolStage.java  | 45 +++++++----
 8 files changed, 211 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/2b032719/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
index a566475..6edb52f 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
@@ -32,6 +32,8 @@ import org.apache.reef.driver.evaluator.FailedEvaluator;
 import org.apache.reef.driver.parameters.*;
 import org.apache.reef.driver.task.*;
 import org.apache.reef.runtime.common.driver.DriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.driver.parameters.EvaluatorIdlenessThreadPoolSize;
+import org.apache.reef.runtime.common.driver.parameters.EvaluatorIdlenessWaitInMilliseconds;
 import org.apache.reef.tang.formats.*;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.time.Clock;
@@ -177,6 +179,8 @@ public final class DriverConfiguration extends ConfigurationModuleBuilder {
    */
   public static final OptionalImpl<EventHandler<ContextMessage>> ON_CONTEXT_MESSAGE = new OptionalImpl<>();
 
+  // ***** MISC
+
   /**
    * Progress provider. See {@link ProgressProvider}.
    */
@@ -193,6 +197,18 @@ public final class DriverConfiguration extends ConfigurationModuleBuilder {
   public static final OptionalParameter<Integer> MAX_APPLICATION_SUBMISSIONS = new OptionalParameter<>();
 
   /**
+   * The number of Threads in a Driver to verify the completion of Evaluators.
+   * Used by {@link org.apache.reef.runtime.common.driver.evaluator.EvaluatorIdlenessThreadPool}.
+   */
+  public static final OptionalParameter<Integer> EVALUATOR_IDLENESS_THREAD_POOL_SIZE = new OptionalParameter<>();
+
+  /**
+   * The number of Threads in a Driver to verify the completion of Evaluators.
+   * Used by {@link org.apache.reef.runtime.common.driver.evaluator.EvaluatorIdlenessThreadPool}.
+   */
+  public static final OptionalParameter<Long> EVALUATOR_IDLENESS_WAIT_IN_MS = new OptionalParameter<>();
+
+  /**
    * ConfigurationModule to fill out to get a legal Driver Configuration.
    */
   public static final ConfigurationModule CONF = new DriverConfiguration().merge(DriverRuntimeConfiguration.CONF)
@@ -235,6 +251,8 @@ public final class DriverConfiguration extends ConfigurationModuleBuilder {
 
           // Various parameters
       .bindNamedParameter(EvaluatorDispatcherThreads.class, EVALUATOR_DISPATCHER_THREADS)
+      .bindNamedParameter(EvaluatorIdlenessThreadPoolSize.class, EVALUATOR_IDLENESS_THREAD_POOL_SIZE)
+      .bindNamedParameter(EvaluatorIdlenessWaitInMilliseconds.class, EVALUATOR_IDLENESS_WAIT_IN_MS)
       .bindImplementation(ProgressProvider.class, PROGRESS_PROVIDER)
       .build();
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/2b032719/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingletons.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingletons.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingletons.java
index 7b21f9a..8c31e05 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingletons.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingletons.java
@@ -32,6 +32,7 @@ import org.apache.reef.driver.task.*;
 import org.apache.reef.proto.ClientRuntimeProtocol;
 import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
 import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorIdlenessThreadPool;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.wake.EventHandler;
 
@@ -98,6 +99,8 @@ final class DriverSingletons {
       // we get container failures dure to modifications
       // to already submitted global jar file
       final ResourceLaunchHandler resourceLaunchHandler,
-      final ResourceReleaseHandler resourceReleaseHandler) {
+      final ResourceReleaseHandler resourceReleaseHandler,
+
+      final EvaluatorIdlenessThreadPool evaluatorIdlenessThreadPool) {
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/2b032719/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java
new file mode 100644
index 0000000..e94ab4c
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.driver.parameters.EvaluatorIdlenessThreadPoolSize;
+import org.apache.reef.runtime.common.driver.parameters.EvaluatorIdlenessWaitInMilliseconds;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.impl.DefaultThreadFactory;
+
+import javax.inject.Inject;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Runs threads in a thread pool to check the completion of Evaluators on the closing
+ * of an {@link EvaluatorManager} in order to trigger Evaluator idleness checks.
+ */
+@Private
+public final class EvaluatorIdlenessThreadPool {
+  private static final Logger LOG = Logger.getLogger(EvaluatorIdlenessThreadPool.class.getName());
+
+  private final ExecutorService executor;
+  private final long waitInMillis;
+
+  @Inject
+  private EvaluatorIdlenessThreadPool(@Parameter(EvaluatorIdlenessThreadPoolSize.class) final int numThreads,
+                                      @Parameter(EvaluatorIdlenessWaitInMilliseconds.class) final long waitInMillis) {
+
+    Validate.isTrue(waitInMillis >= 0, "EvaluatorIdlenessWaitInMilliseconds must be configured to be >= 0");
+    Validate.isTrue(numThreads > 0, "EvaluatorIdlenessThreadPoolSize must be configured to be > 0");
+
+    this.waitInMillis = waitInMillis;
+    this.executor = Executors.newFixedThreadPool(
+        numThreads, new DefaultThreadFactory(EvaluatorIdlenessThreadPool.class.getName()));
+  }
+
+  /**
+   * Runs a check in the ThreadPool for the {@link EvaluatorManager} to wait for it to finish its
+   * Event Handling and check its idleness source.
+   * @param manager the {@link EvaluatorManager}
+   */
+  void runCheckAsync(final EvaluatorManager manager) {
+    executor.submit(new Runnable() {
+      @Override
+      public void run() {
+        while (!manager.isClosed()) {
+          try {
+            Thread.sleep(waitInMillis);
+          } catch (final InterruptedException e) {
+            LOG.log(Level.SEVERE, "Thread interrupted while waiting for Evaluator to finish.");
+            throw new RuntimeException(e);
+          }
+        }
+
+        manager.checkIdlenessSource();
+        LOG.log(Level.FINE, "Evaluator " + manager.getId() + " has finished.");
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/2b032719/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index f483868..a8bd61a 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -107,6 +107,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
   private final LoggingScopeFactory loggingScopeFactory;
   private final Set<ConfigurationProvider> evaluatorConfigurationProviders;
   private final DriverRestartManager driverRestartManager;
+  private final EvaluatorIdlenessThreadPool idlenessThreadPool;
 
   // Mutable fields
   private Optional<TaskRepresenter> task = Optional.empty();
@@ -132,7 +133,8 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
       final LoggingScopeFactory loggingScopeFactory,
       @Parameter(EvaluatorConfigurationProviders.class)
       final Set<ConfigurationProvider> evaluatorConfigurationProviders,
-      final DriverRestartManager driverRestartManager) {
+      final DriverRestartManager driverRestartManager,
+      final EvaluatorIdlenessThreadPool idlenessThreadPool) {
     this.contextRepresenters = contextRepresenters;
     this.idlenessSource = idlenessSource;
     LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: {0}", evaluatorId);
@@ -153,6 +155,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
     this.loggingScopeFactory = loggingScopeFactory;
     this.evaluatorConfigurationProviders = evaluatorConfigurationProviders;
     this.driverRestartManager = driverRestartManager;
+    this.idlenessThreadPool = idlenessThreadPool;
 
     LOG.log(Level.FINEST, "Instantiated 'EvaluatorManager' for evaluator: [{0}]", this.getId());
   }
@@ -264,12 +267,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
       }
     }
 
-    try {
-      this.messageDispatcher.close();
-    } catch (Exception e) {
-      LOG.log(Level.SEVERE, "Exception while closing EvaluatorManager", e);
-    }
-    this.idlenessSource.check();
+    idlenessThreadPool.runCheckAsync(this);
   }
 
   /**
@@ -282,6 +280,13 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
   }
 
   /**
+   * Triggers a call to check the idleness of the Evaluator.
+   */
+  void checkIdlenessSource() {
+    this.idlenessSource.check();
+  }
+
+  /**
    * EvaluatorException will trigger is FailedEvaluator and state transition to FAILED.
    *
    * @param exception on the EvaluatorRuntime

http://git-wip-us.apache.org/repos/asf/reef/blob/2b032719/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/parameters/EvaluatorIdlenessThreadPoolSize.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/parameters/EvaluatorIdlenessThreadPoolSize.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/parameters/EvaluatorIdlenessThreadPoolSize.java
new file mode 100644
index 0000000..0dce5c0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/parameters/EvaluatorIdlenessThreadPoolSize.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The number of Threads in a Driver to verify the completion of Evaluators.
+ * Used by {@link org.apache.reef.runtime.common.driver.evaluator.EvaluatorIdlenessThreadPool}.
+ */
+@NamedParameter(doc = "The number of Threads in a Driver to verify the completion of Evaluators.",
+    default_value = "5")
+public final class EvaluatorIdlenessThreadPoolSize implements Name<Integer> {
+  private EvaluatorIdlenessThreadPoolSize(){
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/2b032719/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/parameters/EvaluatorIdlenessWaitInMilliseconds.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/parameters/EvaluatorIdlenessWaitInMilliseconds.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/parameters/EvaluatorIdlenessWaitInMilliseconds.java
new file mode 100644
index 0000000..a0168c5
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/parameters/EvaluatorIdlenessWaitInMilliseconds.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The number of Threads in a Driver to verify the completion of Evaluators.
+ * Used by {@link org.apache.reef.runtime.common.driver.evaluator.EvaluatorIdlenessThreadPool}.
+ */
+@NamedParameter(doc = "The number of milliseconds to wait in a loop to verify the completion of Evaluators.",
+    default_value = "500")
+public final class EvaluatorIdlenessWaitInMilliseconds implements Name<Long> {
+  private EvaluatorIdlenessWaitInMilliseconds(){
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/2b032719/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
index 36c0aee..03c1463 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
@@ -114,7 +114,7 @@ public final class DispatchingEStage implements AutoCloseable {
    * Return true if there are no messages queued or in processing, false otherwise.
    */
   public boolean isEmpty() {
-    return this.stage.getQueueLength() == 0;
+    return this.stage.getQueueLength() + this.stage.getActiveCount() == 0;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/reef/blob/2b032719/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
index 3d46e37..3b39c8a 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
@@ -174,24 +174,32 @@ public final class ThreadPoolStage<T> extends AbstractEStage<T> {
   @SuppressWarnings("checkstyle:illegalcatch")
   public void onNext(final T value) {
     beforeOnNext();
-    executor.submit(new Runnable() {
-
-      @Override
-      public void run() {
-        try {
-          handler.onNext(value);
-          afterOnNext();
-        } catch (final Throwable t) {
-          if (errorHandler != null) {
-            errorHandler.onNext(t);
-          } else {
-            LOG.log(Level.SEVERE, name + " Exception from event handler", t);
-            throw t;
+    try {
+      executor.submit(new Runnable() {
+
+        @Override
+        public void run() {
+          try {
+            handler.onNext(value);
+          } catch (final Throwable t) {
+            if (errorHandler != null) {
+              errorHandler.onNext(t);
+            } else {
+              LOG.log(Level.SEVERE, name + " Exception from event handler", t);
+              throw t;
+            }
+          } finally {
+            afterOnNext();
           }
         }
-      }
 
-    });
+      });
+    } catch (final Exception e) {
+      LOG.log(Level.SEVERE, "Encountered error when submitting to executor in ThreadPoolStage.");
+      afterOnNext();
+      throw e;
+    }
+
   }
 
   /**
@@ -218,4 +226,11 @@ public final class ThreadPoolStage<T> extends AbstractEStage<T> {
     return ((ThreadPoolExecutor) executor).getQueue().size();
   }
 
+  /**
+   * Gets the active count of this stage.
+   * @return the active count
+   */
+  public int getActiveCount() {
+    return (int)(getInMeter().getCount() - getOutMeter().getCount());
+  }
 }