You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/11/02 19:50:17 UTC

reef git commit: [REEF-1658] Gracefully shut down all threads at the end of the REEF job

Repository: reef
Updated Branches:
  refs/heads/master 220d75cac -> 55e0cfc85


[REEF-1658] Gracefully shut down all threads at the end of the REEF job

This work is part of "REEF as a library" project
[REEF-1561](https://issues.apache.org/jira/browse/REEF-1561)

Summary of changes:
  * Close the `EvaluatorIdlenessThreadPool` in `DriverRuntimeStopHandler`
  * Make `EvaluatorIdlenessThreadPool` `AutoCloseable` to close the local thread
    pool
  * Implement `EvaluatorManager.shutdown()` method and call it when removing
    evaluator from `Evaluators`
  * Make sure `.close()` methods never throw in `DispatchingEStage` and
    `EvaluatorMessageDispatcher`
  * Give better thread names in `EvaluatorIdlenessThreadPool`
  * Improve logging and error handling in `.close()` methods in all path of the
    driver shutdown

JIRA:
  [REEF-1658](https://issues.apache.org/jira/browse/REEF-1658)

Pull Request:
  This closes #1175


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/55e0cfc8
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/55e0cfc8
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/55e0cfc8

Branch: refs/heads/master
Commit: 55e0cfc856c4f3940bedd6539de67a709e723c00
Parents: 220d75c
Author: Sergiy Matusevych <mo...@apache.org>
Authored: Mon Oct 31 22:34:15 2016 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Wed Nov 2 12:48:44 2016 -0700

----------------------------------------------------------------------
 .../common/driver/DriverRuntimeStopHandler.java | 20 +++++--
 .../evaluator/EvaluatorIdlenessThreadPool.java  | 63 ++++++++++++++++++--
 .../driver/evaluator/EvaluatorManager.java      | 13 +++-
 .../evaluator/EvaluatorMessageDispatcher.java   |  2 +-
 .../common/driver/evaluator/Evaluators.java     | 25 ++++++--
 .../runtime/common/utils/DispatchingEStage.java |  4 +-
 6 files changed, 106 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/55e0cfc8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
index d344cf3..0919b77 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
@@ -24,6 +24,7 @@ import org.apache.reef.driver.parameters.ResourceManagerPreserveEvaluators;
 import org.apache.reef.driver.restart.DriverRestartManager;
 import org.apache.reef.exception.DriverFatalRuntimeException;
 import org.apache.reef.runtime.common.driver.api.ResourceManagerStopHandler;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorIdlenessThreadPool;
 import org.apache.reef.runtime.common.driver.evaluator.Evaluators;
 import org.apache.reef.runtime.common.utils.RemoteManager;
 import org.apache.reef.tang.annotations.Parameter;
@@ -50,6 +51,7 @@ final class DriverRuntimeStopHandler implements EventHandler<RuntimeStop> {
   private final ResourceManagerStopHandler resourceManagerStopHandler;
   private final RemoteManager remoteManager;
   private final Evaluators evaluators;
+  private final EvaluatorIdlenessThreadPool idlenessChecker;
   private final boolean preserveEvaluatorsAcrossRestarts;
 
   @Inject
@@ -59,20 +61,22 @@ final class DriverRuntimeStopHandler implements EventHandler<RuntimeStop> {
       final DriverStatusManager driverStatusManager,
       final ResourceManagerStopHandler resourceManagerStopHandler,
       final RemoteManager remoteManager,
-      final Evaluators evaluators) {
+      final Evaluators evaluators,
+      final EvaluatorIdlenessThreadPool idlenessChecker) {
 
     this.driverRestartManager = driverRestartManager;
     this.driverStatusManager = driverStatusManager;
     this.resourceManagerStopHandler = resourceManagerStopHandler;
     this.remoteManager = remoteManager;
     this.evaluators = evaluators;
+    this.idlenessChecker = idlenessChecker;
     this.preserveEvaluatorsAcrossRestarts = preserveEvaluatorsAcrossRestarts;
   }
 
   @Override
   public synchronized void onNext(final RuntimeStop runtimeStop) {
 
-    LOG.log(Level.FINEST, "RuntimeStop: {0}", runtimeStop);
+    LOG.log(Level.FINE, "Driver shutdown: start {0}", runtimeStop);
 
     final Throwable runtimeException = runtimeStop.getException();
 
@@ -81,23 +85,29 @@ final class DriverRuntimeStopHandler implements EventHandler<RuntimeStop> {
     if (runtimeException == null ||
         runtimeException instanceof DriverFatalRuntimeException ||
         !this.preserveEvaluatorsAcrossRestarts) {
+      LOG.log(Level.FINER, "Driver shutdown: close the evaluators");
       this.evaluators.close();
     }
 
     this.resourceManagerStopHandler.onNext(runtimeStop);
 
-    // Inform the client of the shutdown.
+    LOG.log(Level.FINER, "Driver shutdown: notify the client");
     this.driverStatusManager.onRuntimeStop(Optional.ofNullable(runtimeException));
 
-    // Close the remoteManager.
     try {
+      LOG.log(Level.FINER, "Driver shutdown: close the remote manager");
       this.remoteManager.close();
-      LOG.log(Level.INFO, "Driver shutdown complete");
     } catch (final Exception e) {
       LOG.log(Level.WARNING, "Error when closing the RemoteManager", e);
       throw new RuntimeException("Unable to close the RemoteManager.", e);
     }
 
+    LOG.log(Level.FINER, "Driver shutdown: close the restart manager");
     this.driverRestartManager.close();
+
+    LOG.log(Level.FINER, "Driver shutdown: close the idleness checker");
+    this.idlenessChecker.close();
+
+    LOG.log(Level.INFO, "Driver shutdown complete");
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/55e0cfc8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java
index e94ab4c..e689504 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java
@@ -26,8 +26,10 @@ import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.wake.impl.DefaultThreadFactory;
 
 import javax.inject.Inject;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -36,22 +38,25 @@ import java.util.logging.Logger;
  * of an {@link EvaluatorManager} in order to trigger Evaluator idleness checks.
  */
 @Private
-public final class EvaluatorIdlenessThreadPool {
+public final class EvaluatorIdlenessThreadPool implements AutoCloseable {
+
   private static final Logger LOG = Logger.getLogger(EvaluatorIdlenessThreadPool.class.getName());
 
   private final ExecutorService executor;
   private final long waitInMillis;
 
   @Inject
-  private EvaluatorIdlenessThreadPool(@Parameter(EvaluatorIdlenessThreadPoolSize.class) final int numThreads,
-                                      @Parameter(EvaluatorIdlenessWaitInMilliseconds.class) final long waitInMillis) {
+  private EvaluatorIdlenessThreadPool(
+      @Parameter(EvaluatorIdlenessThreadPoolSize.class) final int numThreads,
+      @Parameter(EvaluatorIdlenessWaitInMilliseconds.class) final long waitInMillis) {
 
     Validate.isTrue(waitInMillis >= 0, "EvaluatorIdlenessWaitInMilliseconds must be configured to be >= 0");
     Validate.isTrue(numThreads > 0, "EvaluatorIdlenessThreadPoolSize must be configured to be > 0");
 
     this.waitInMillis = waitInMillis;
+
     this.executor = Executors.newFixedThreadPool(
-        numThreads, new DefaultThreadFactory(EvaluatorIdlenessThreadPool.class.getName()));
+        numThreads, new DefaultThreadFactory(this.getClass().getSimpleName()));
   }
 
   /**
@@ -60,12 +65,26 @@ public final class EvaluatorIdlenessThreadPool {
    * @param manager the {@link EvaluatorManager}
    */
   void runCheckAsync(final EvaluatorManager manager) {
-    executor.submit(new Runnable() {
+
+    final String evaluatorId = manager.getId();
+    LOG.log(Level.FINEST, "Idle check for Evaluator: {0}", manager);
+
+    this.executor.submit(new Runnable() {
+
       @Override
       public void run() {
+
+        LOG.log(Level.FINEST, "Idle check for Evaluator {0} - begin", evaluatorId);
+
         while (!manager.isClosed()) {
           try {
+
+            LOG.log(Level.FINEST,
+                "Waiting for Evaluator {0} to close: Sleep for {1} ms",
+                new Object[] {evaluatorId, waitInMillis});
+
             Thread.sleep(waitInMillis);
+
           } catch (final InterruptedException e) {
             LOG.log(Level.SEVERE, "Thread interrupted while waiting for Evaluator to finish.");
             throw new RuntimeException(e);
@@ -73,8 +92,40 @@ public final class EvaluatorIdlenessThreadPool {
         }
 
         manager.checkIdlenessSource();
-        LOG.log(Level.FINE, "Evaluator " + manager.getId() + " has finished.");
+
+        LOG.log(Level.FINEST, "Idle check for Evaluator {0} - end", evaluatorId);
+      }
+
+      @Override
+      public String toString() {
+        return "CheckIdle: " + evaluatorId;
       }
     });
   }
+
+  /**
+   * Shutdown the thread pool of idleness checkers.
+   */
+  @Override
+  public void close() {
+
+    LOG.log(Level.FINE, "EvaluatorIdlenessThreadPool shutdown: begin");
+
+    this.executor.shutdown();
+
+    boolean isTerminated = false;
+    try {
+      isTerminated = this.executor.awaitTermination(this.waitInMillis, TimeUnit.MILLISECONDS);
+    } catch (final InterruptedException ex) {
+      LOG.log(Level.WARNING, "EvaluatorIdlenessThreadPool shutdown: Interrupted", ex);
+    }
+
+    if (isTerminated) {
+      LOG.log(Level.FINE, "EvaluatorIdlenessThreadPool shutdown: Terminated successfully");
+    } else {
+      final List<Runnable> pendingJobs = this.executor.shutdownNow();
+      LOG.log(Level.SEVERE, "EvaluatorIdlenessThreadPool shutdown: {0} jobs after timeout", pendingJobs.size());
+      LOG.log(Level.FINE, "EvaluatorIdlenessThreadPool shutdown: pending jobs: {0}", pendingJobs);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/55e0cfc8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index fc77380..26af25f 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -219,6 +219,8 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
   @Override
   public void close() {
 
+    LOG.log(Level.FINER, "Close EvaluatorManager {0} - begin", this.evaluatorId);
+
     synchronized (this.evaluatorDescriptor) {
 
       if (this.stateManager.isAvailable()) {
@@ -275,6 +277,15 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
     }
 
     this.idlenessThreadPool.runCheckAsync(this);
+
+    LOG.log(Level.FINER, "Close EvaluatorManager {0} - end", this.evaluatorId);
+  }
+
+  /**
+   * Close message dispatcher for the evaluator.
+   */
+  public void shutdown() {
+    this.messageDispatcher.close();
   }
 
   /**
@@ -357,7 +368,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
         LOG.log(Level.SEVERE, "Exception while handling FailedEvaluator", e);
       } finally {
         this.stateManager.setFailed();
-        close();
+        this.close();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/55e0cfc8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
index 18e868a..ce879da 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
@@ -286,7 +286,7 @@ public final class EvaluatorMessageDispatcher implements AutoCloseable {
   }
 
   @Override
-  public void close() throws Exception {
+  public void close() {
     LOG.log(Level.FINER, "Closing message dispatcher for {0}", this.evaluatorIdentifier);
     // This effectively closes all dispatchers as they share the same stage.
     this.serviceDispatcher.close();

http://git-wip-us.apache.org/repos/asf/reef/blob/55e0cfc8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
index 68a122d..2e6974b 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
@@ -65,16 +65,22 @@ public final class Evaluators implements AutoCloseable {
    */
   @Override
   public void close() {
+
+    LOG.log(Level.FINER, "Closing the evaluators - begin");
+
     final List<EvaluatorManager> evaluatorsCopy;
     synchronized (this) {
       evaluatorsCopy = new ArrayList<>(this.evaluators.values());
     }
+
     for (final EvaluatorManager evaluatorManager : evaluatorsCopy) {
       if (!evaluatorManager.isClosedOrClosing()) {
         LOG.log(Level.WARNING, "Unclean shutdown of evaluator {0}", evaluatorManager.getId());
         evaluatorManager.close();
       }
     }
+
+    LOG.log(Level.FINER, "Closing the evaluators - end");
   }
 
   /**
@@ -148,19 +154,28 @@ public final class Evaluators implements AutoCloseable {
    * Moves evaluator from map of active evaluators to set of closed evaluators.
    */
   public synchronized void removeClosedEvaluator(final EvaluatorManager evaluatorManager) {
+
     final String evaluatorId = evaluatorManager.getId();
+
     if (!evaluatorManager.isClosed()) {
       throw new IllegalArgumentException("Trying to remove evaluator " + evaluatorId + " which is not closed yet.");
     }
+
     if (!this.evaluators.containsKey(evaluatorId) && !this.closedEvaluatorIds.contains(evaluatorId)) {
       throw new IllegalArgumentException("Trying to remove unknown evaluator " + evaluatorId + ".");
     }
+
     if (!this.evaluators.containsKey(evaluatorId) && this.closedEvaluatorIds.contains(evaluatorId)) {
-      LOG.log(Level.FINE, "Trying to remove closed evaluator " + evaluatorId + " which has already been removed.");
-    } else {
-      LOG.log(Level.FINE, "Removing closed evaluator " + evaluatorId + ".");
-      this.evaluators.remove(evaluatorId);
-      this.closedEvaluatorIds.add(evaluatorId);
+      LOG.log(Level.FINE, "Trying to remove closed evaluator {0} which has already been removed.", evaluatorId);
+      return;
     }
+
+    LOG.log(Level.FINE, "Removing closed evaluator {0}", evaluatorId);
+
+    evaluatorManager.shutdown();
+    this.evaluators.remove(evaluatorId);
+    this.closedEvaluatorIds.add(evaluatorId);
+
+    LOG.log(Level.FINEST, "Evaluator {0} removed", evaluatorId);
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/55e0cfc8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
index 03c1463..3a65df9 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
@@ -119,11 +119,9 @@ public final class DispatchingEStage implements AutoCloseable {
 
   /**
    * Close the internal thread pool.
-   *
-   * @throws Exception forwarded from EStage.close() call.
    */
   @Override
-  public void close() throws Exception {
+  public void close() {
     this.stage.close();
   }