You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/11/02 19:50:17 UTC
reef git commit: [REEF-1658] Gracefully shut down all threads at the
end of the REEF job
Repository: reef
Updated Branches:
refs/heads/master 220d75cac -> 55e0cfc85
[REEF-1658] Gracefully shut down all threads at the end of the REEF job
This work is part of "REEF as a library" project
[REEF-1561](https://issues.apache.org/jira/browse/REEF-1561)
Summary of changes:
* Close the `EvaluatorIdlenessThreadPool` in `DriverRuntimeStopHandler`
* Make `EvaluatorIdlenessThreadPool` `AutoCloseable` to close the local thread
pool
* Implement `EvaluatorManager.shutdown()` method and call it when removing
evaluator from `Evaluators`
* Make sure `.close()` methods never throw in `DispatchingEStage` and
`EvaluatorMessageDispatcher`
* Give better thread names in `EvaluatorIdlenessThreadPool`
* Improve logging and error handling in `.close()` methods in all path of the
driver shutdown
JIRA:
[REEF-1658](https://issues.apache.org/jira/browse/REEF-1658)
Pull Request:
This closes #1175
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/55e0cfc8
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/55e0cfc8
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/55e0cfc8
Branch: refs/heads/master
Commit: 55e0cfc856c4f3940bedd6539de67a709e723c00
Parents: 220d75c
Author: Sergiy Matusevych <mo...@apache.org>
Authored: Mon Oct 31 22:34:15 2016 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Wed Nov 2 12:48:44 2016 -0700
----------------------------------------------------------------------
.../common/driver/DriverRuntimeStopHandler.java | 20 +++++--
.../evaluator/EvaluatorIdlenessThreadPool.java | 63 ++++++++++++++++++--
.../driver/evaluator/EvaluatorManager.java | 13 +++-
.../evaluator/EvaluatorMessageDispatcher.java | 2 +-
.../common/driver/evaluator/Evaluators.java | 25 ++++++--
.../runtime/common/utils/DispatchingEStage.java | 4 +-
6 files changed, 106 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/55e0cfc8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
index d344cf3..0919b77 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
@@ -24,6 +24,7 @@ import org.apache.reef.driver.parameters.ResourceManagerPreserveEvaluators;
import org.apache.reef.driver.restart.DriverRestartManager;
import org.apache.reef.exception.DriverFatalRuntimeException;
import org.apache.reef.runtime.common.driver.api.ResourceManagerStopHandler;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorIdlenessThreadPool;
import org.apache.reef.runtime.common.driver.evaluator.Evaluators;
import org.apache.reef.runtime.common.utils.RemoteManager;
import org.apache.reef.tang.annotations.Parameter;
@@ -50,6 +51,7 @@ final class DriverRuntimeStopHandler implements EventHandler<RuntimeStop> {
private final ResourceManagerStopHandler resourceManagerStopHandler;
private final RemoteManager remoteManager;
private final Evaluators evaluators;
+ private final EvaluatorIdlenessThreadPool idlenessChecker;
private final boolean preserveEvaluatorsAcrossRestarts;
@Inject
@@ -59,20 +61,22 @@ final class DriverRuntimeStopHandler implements EventHandler<RuntimeStop> {
final DriverStatusManager driverStatusManager,
final ResourceManagerStopHandler resourceManagerStopHandler,
final RemoteManager remoteManager,
- final Evaluators evaluators) {
+ final Evaluators evaluators,
+ final EvaluatorIdlenessThreadPool idlenessChecker) {
this.driverRestartManager = driverRestartManager;
this.driverStatusManager = driverStatusManager;
this.resourceManagerStopHandler = resourceManagerStopHandler;
this.remoteManager = remoteManager;
this.evaluators = evaluators;
+ this.idlenessChecker = idlenessChecker;
this.preserveEvaluatorsAcrossRestarts = preserveEvaluatorsAcrossRestarts;
}
@Override
public synchronized void onNext(final RuntimeStop runtimeStop) {
- LOG.log(Level.FINEST, "RuntimeStop: {0}", runtimeStop);
+ LOG.log(Level.FINE, "Driver shutdown: start {0}", runtimeStop);
final Throwable runtimeException = runtimeStop.getException();
@@ -81,23 +85,29 @@ final class DriverRuntimeStopHandler implements EventHandler<RuntimeStop> {
if (runtimeException == null ||
runtimeException instanceof DriverFatalRuntimeException ||
!this.preserveEvaluatorsAcrossRestarts) {
+ LOG.log(Level.FINER, "Driver shutdown: close the evaluators");
this.evaluators.close();
}
this.resourceManagerStopHandler.onNext(runtimeStop);
- // Inform the client of the shutdown.
+ LOG.log(Level.FINER, "Driver shutdown: notify the client");
this.driverStatusManager.onRuntimeStop(Optional.ofNullable(runtimeException));
- // Close the remoteManager.
try {
+ LOG.log(Level.FINER, "Driver shutdown: close the remote manager");
this.remoteManager.close();
- LOG.log(Level.INFO, "Driver shutdown complete");
} catch (final Exception e) {
LOG.log(Level.WARNING, "Error when closing the RemoteManager", e);
throw new RuntimeException("Unable to close the RemoteManager.", e);
}
+ LOG.log(Level.FINER, "Driver shutdown: close the restart manager");
this.driverRestartManager.close();
+
+ LOG.log(Level.FINER, "Driver shutdown: close the idleness checker");
+ this.idlenessChecker.close();
+
+ LOG.log(Level.INFO, "Driver shutdown complete");
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/55e0cfc8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java
index e94ab4c..e689504 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java
@@ -26,8 +26,10 @@ import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.impl.DefaultThreadFactory;
import javax.inject.Inject;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -36,22 +38,25 @@ import java.util.logging.Logger;
* of an {@link EvaluatorManager} in order to trigger Evaluator idleness checks.
*/
@Private
-public final class EvaluatorIdlenessThreadPool {
+public final class EvaluatorIdlenessThreadPool implements AutoCloseable {
+
private static final Logger LOG = Logger.getLogger(EvaluatorIdlenessThreadPool.class.getName());
private final ExecutorService executor;
private final long waitInMillis;
@Inject
- private EvaluatorIdlenessThreadPool(@Parameter(EvaluatorIdlenessThreadPoolSize.class) final int numThreads,
- @Parameter(EvaluatorIdlenessWaitInMilliseconds.class) final long waitInMillis) {
+ private EvaluatorIdlenessThreadPool(
+ @Parameter(EvaluatorIdlenessThreadPoolSize.class) final int numThreads,
+ @Parameter(EvaluatorIdlenessWaitInMilliseconds.class) final long waitInMillis) {
Validate.isTrue(waitInMillis >= 0, "EvaluatorIdlenessWaitInMilliseconds must be configured to be >= 0");
Validate.isTrue(numThreads > 0, "EvaluatorIdlenessThreadPoolSize must be configured to be > 0");
this.waitInMillis = waitInMillis;
+
this.executor = Executors.newFixedThreadPool(
- numThreads, new DefaultThreadFactory(EvaluatorIdlenessThreadPool.class.getName()));
+ numThreads, new DefaultThreadFactory(this.getClass().getSimpleName()));
}
/**
@@ -60,12 +65,26 @@ public final class EvaluatorIdlenessThreadPool {
* @param manager the {@link EvaluatorManager}
*/
void runCheckAsync(final EvaluatorManager manager) {
- executor.submit(new Runnable() {
+
+ final String evaluatorId = manager.getId();
+ LOG.log(Level.FINEST, "Idle check for Evaluator: {0}", manager);
+
+ this.executor.submit(new Runnable() {
+
@Override
public void run() {
+
+ LOG.log(Level.FINEST, "Idle check for Evaluator {0} - begin", evaluatorId);
+
while (!manager.isClosed()) {
try {
+
+ LOG.log(Level.FINEST,
+ "Waiting for Evaluator {0} to close: Sleep for {1} ms",
+ new Object[] {evaluatorId, waitInMillis});
+
Thread.sleep(waitInMillis);
+
} catch (final InterruptedException e) {
LOG.log(Level.SEVERE, "Thread interrupted while waiting for Evaluator to finish.");
throw new RuntimeException(e);
@@ -73,8 +92,40 @@ public final class EvaluatorIdlenessThreadPool {
}
manager.checkIdlenessSource();
- LOG.log(Level.FINE, "Evaluator " + manager.getId() + " has finished.");
+
+ LOG.log(Level.FINEST, "Idle check for Evaluator {0} - end", evaluatorId);
+ }
+
+ @Override
+ public String toString() {
+ return "CheckIdle: " + evaluatorId;
}
});
}
+
+ /**
+ * Shutdown the thread pool of idleness checkers.
+ */
+ @Override
+ public void close() {
+
+ LOG.log(Level.FINE, "EvaluatorIdlenessThreadPool shutdown: begin");
+
+ this.executor.shutdown();
+
+ boolean isTerminated = false;
+ try {
+ isTerminated = this.executor.awaitTermination(this.waitInMillis, TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.WARNING, "EvaluatorIdlenessThreadPool shutdown: Interrupted", ex);
+ }
+
+ if (isTerminated) {
+ LOG.log(Level.FINE, "EvaluatorIdlenessThreadPool shutdown: Terminated successfully");
+ } else {
+ final List<Runnable> pendingJobs = this.executor.shutdownNow();
+ LOG.log(Level.SEVERE, "EvaluatorIdlenessThreadPool shutdown: {0} jobs after timeout", pendingJobs.size());
+ LOG.log(Level.FINE, "EvaluatorIdlenessThreadPool shutdown: pending jobs: {0}", pendingJobs);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/reef/blob/55e0cfc8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index fc77380..26af25f 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -219,6 +219,8 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
@Override
public void close() {
+ LOG.log(Level.FINER, "Close EvaluatorManager {0} - begin", this.evaluatorId);
+
synchronized (this.evaluatorDescriptor) {
if (this.stateManager.isAvailable()) {
@@ -275,6 +277,15 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
}
this.idlenessThreadPool.runCheckAsync(this);
+
+ LOG.log(Level.FINER, "Close EvaluatorManager {0} - end", this.evaluatorId);
+ }
+
+ /**
+ * Close message dispatcher for the evaluator.
+ */
+ public void shutdown() {
+ this.messageDispatcher.close();
}
/**
@@ -357,7 +368,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
LOG.log(Level.SEVERE, "Exception while handling FailedEvaluator", e);
} finally {
this.stateManager.setFailed();
- close();
+ this.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/55e0cfc8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
index 18e868a..ce879da 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
@@ -286,7 +286,7 @@ public final class EvaluatorMessageDispatcher implements AutoCloseable {
}
@Override
- public void close() throws Exception {
+ public void close() {
LOG.log(Level.FINER, "Closing message dispatcher for {0}", this.evaluatorIdentifier);
// This effectively closes all dispatchers as they share the same stage.
this.serviceDispatcher.close();
http://git-wip-us.apache.org/repos/asf/reef/blob/55e0cfc8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
index 68a122d..2e6974b 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
@@ -65,16 +65,22 @@ public final class Evaluators implements AutoCloseable {
*/
@Override
public void close() {
+
+ LOG.log(Level.FINER, "Closing the evaluators - begin");
+
final List<EvaluatorManager> evaluatorsCopy;
synchronized (this) {
evaluatorsCopy = new ArrayList<>(this.evaluators.values());
}
+
for (final EvaluatorManager evaluatorManager : evaluatorsCopy) {
if (!evaluatorManager.isClosedOrClosing()) {
LOG.log(Level.WARNING, "Unclean shutdown of evaluator {0}", evaluatorManager.getId());
evaluatorManager.close();
}
}
+
+ LOG.log(Level.FINER, "Closing the evaluators - end");
}
/**
@@ -148,19 +154,28 @@ public final class Evaluators implements AutoCloseable {
* Moves evaluator from map of active evaluators to set of closed evaluators.
*/
public synchronized void removeClosedEvaluator(final EvaluatorManager evaluatorManager) {
+
final String evaluatorId = evaluatorManager.getId();
+
if (!evaluatorManager.isClosed()) {
throw new IllegalArgumentException("Trying to remove evaluator " + evaluatorId + " which is not closed yet.");
}
+
if (!this.evaluators.containsKey(evaluatorId) && !this.closedEvaluatorIds.contains(evaluatorId)) {
throw new IllegalArgumentException("Trying to remove unknown evaluator " + evaluatorId + ".");
}
+
if (!this.evaluators.containsKey(evaluatorId) && this.closedEvaluatorIds.contains(evaluatorId)) {
- LOG.log(Level.FINE, "Trying to remove closed evaluator " + evaluatorId + " which has already been removed.");
- } else {
- LOG.log(Level.FINE, "Removing closed evaluator " + evaluatorId + ".");
- this.evaluators.remove(evaluatorId);
- this.closedEvaluatorIds.add(evaluatorId);
+ LOG.log(Level.FINE, "Trying to remove closed evaluator {0} which has already been removed.", evaluatorId);
+ return;
}
+
+ LOG.log(Level.FINE, "Removing closed evaluator {0}", evaluatorId);
+
+ evaluatorManager.shutdown();
+ this.evaluators.remove(evaluatorId);
+ this.closedEvaluatorIds.add(evaluatorId);
+
+ LOG.log(Level.FINEST, "Evaluator {0} removed", evaluatorId);
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/55e0cfc8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
index 03c1463..3a65df9 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
@@ -119,11 +119,9 @@ public final class DispatchingEStage implements AutoCloseable {
/**
* Close the internal thread pool.
- *
- * @throws Exception forwarded from EStage.close() call.
*/
@Override
- public void close() throws Exception {
+ public void close() {
this.stage.close();
}