You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/05/11 17:24:20 UTC
reef git commit: REEF-1250] Fix memory leak in Evaluators
Repository: reef
Updated Branches:
refs/heads/master c67391a1d -> ac6785db4
REEF-1250] Fix memory leak in Evaluators
This change:
* introduces a set of closed evaluator ids,
* moves evaluator to this set whenever resource manager
sends resource status indicating that evaluator is closed.
JIRA:
[REEF-1250](https://issues.apache.org/jira/browse/REEF-1250)
Pull request:
This closes #995
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/ac6785db
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/ac6785db
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/ac6785db
Branch: refs/heads/master
Commit: ac6785db44d9a1aecb819eba96558efdaae4e570
Parents: c67391a
Author: Mariia Mykhailova <ma...@apache.org>
Authored: Mon May 9 15:35:24 2016 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Wed May 11 10:21:54 2016 -0700
----------------------------------------------------------------------
.../evaluator/EvaluatorHeartbeatHandler.java | 5 +++
.../EvaluatorResourceManagerErrorHandler.java | 6 ++-
.../common/driver/evaluator/Evaluators.java | 44 +++++++++++++++++---
.../resourcemanager/ResourceStatusHandler.java | 16 ++++++-
4 files changed, 63 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/ac6785db/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
index 9011d7f..aedbc5a 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
@@ -71,6 +71,11 @@ public final class EvaluatorHeartbeatHandler
return;
}
+ if (this.evaluators.wasClosed(evaluatorId)) {
+ LOG.log(Level.WARNING, "Evaluator [" + evaluatorId + "] has reported back to the driver after it was closed.");
+ return;
+ }
+
if (driverRestartManager.isRestarting() &&
driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.EXPECTED) {
http://git-wip-us.apache.org/repos/asf/reef/blob/ac6785db/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java
index a55fec3..3a188e0 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java
@@ -62,7 +62,11 @@ public final class EvaluatorResourceManagerErrorHandler
if (evaluatorManager.isPresent()) {
evaluatorManager.get().onEvaluatorException(evaluatorException);
} else {
- LOG.log(Level.WARNING, "Unknown evaluator runtime error: " + error);
+ if (this.evaluators.wasClosed(evaluatorId)) {
+ LOG.log(Level.WARNING, "Evaluator [" + evaluatorId + "] has raised exception after it was closed.");
+ } else {
+ LOG.log(Level.WARNING, "Unknown evaluator runtime error: " + error);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/ac6785db/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
index 59ee6a8..5bc0645 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
@@ -23,12 +23,10 @@ import org.apache.reef.annotations.audience.Private;
import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
import org.apache.reef.util.Optional;
import org.apache.reef.util.SingletonAsserter;
+import org.apache.reef.tang.util.MonotonicSet;
import javax.inject.Inject;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -47,6 +45,10 @@ public final class Evaluators implements AutoCloseable {
*/
private final Map<String, EvaluatorManager> evaluators = new HashMap<>();
+ /**
+ * A set of evaluatorIds for "closed" (failed and returned) evaluators.
+ */
+ private final MonotonicSet<String> closedEvaluatorIds = new MonotonicSet<>();
@Inject
Evaluators() {
@@ -95,6 +97,14 @@ public final class Evaluators implements AutoCloseable {
}
/**
+ * @param evaluatorId
+ * @return true if evaluator with this id has already been closed.
+ */
+ public synchronized boolean wasClosed(final String evaluatorId) {
+ return this.closedEvaluatorIds.contains(evaluatorId);
+ }
+
+ /**
* Create new EvaluatorManager and add it to the collection.
* <p>
* FIXME: This method is a temporary fix for the race condition
@@ -118,11 +128,35 @@ public final class Evaluators implements AutoCloseable {
*/
public synchronized void put(final EvaluatorManager evaluatorManager) {
final String evaluatorId = evaluatorManager.getId();
+ if (this.wasClosed(evaluatorId)) {
+ throw new IllegalArgumentException(
+ "Trying to re-add an Evaluator that has already been closed: " + evaluatorId);
+ }
final EvaluatorManager prev = this.evaluators.put(evaluatorId, evaluatorManager);
LOG.log(Level.FINEST, "Adding: {0} previous: {1}", new Object[]{evaluatorId, prev});
if (prev != null) {
throw new IllegalArgumentException(
- "Trying to re-add an Evaluator that is already known: " + evaluatorId);
+ "Trying to re-add an Evaluator that is already known: " + evaluatorId);
+ }
+ }
+
+ /**
+ * Moves evaluator from map of active evaluators to set of closed evaluators.
+ */
+ public synchronized void removeClosedEvaluator(final EvaluatorManager evaluatorManager) {
+ final String evaluatorId = evaluatorManager.getId();
+ if (!evaluatorManager.isClosed()) {
+ throw new IllegalArgumentException("Trying to remove evaluator " + evaluatorId + " which is not closed yet.");
+ }
+ if (!this.evaluators.containsKey(evaluatorId) && !this.closedEvaluatorIds.contains(evaluatorId)) {
+ throw new IllegalArgumentException("Trying to remove unknown evaluator " + evaluatorId + ".");
+ }
+ if (!this.evaluators.containsKey(evaluatorId) && this.closedEvaluatorIds.contains(evaluatorId)) {
+ LOG.log(Level.FINE, "Trying to remove closed evaluator " + evaluatorId + " which has already been removed.");
+ } else {
+ LOG.log(Level.FINE, "Removing closed evaluator " + evaluatorId + ".");
+ this.evaluators.remove(evaluatorId);
+ this.closedEvaluatorIds.add(evaluatorId);
}
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/ac6785db/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
index 1a2cb38..50ee841 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
@@ -28,6 +28,8 @@ import org.apache.reef.util.Optional;
import org.apache.reef.wake.EventHandler;
import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
* A ResourceStatusProto message comes from the ResourceManager layer to indicate what it thinks
@@ -35,6 +37,7 @@ import javax.inject.Inject;
*/
@Private
public final class ResourceStatusHandler implements EventHandler<ResourceStatusEvent> {
+ private static final Logger LOG = Logger.getLogger(Evaluators.class.getName());
private final Evaluators evaluators;
private final EvaluatorManagerFactory evaluatorManagerFactory;
@@ -50,8 +53,8 @@ public final class ResourceStatusHandler implements EventHandler<ResourceStatusE
}
/**
- * This resource status message comes from the ResourceManager layer; telling me what it thinks.
- * about the state of the resource executing an Evaluator; This method simply passes the message
+ * This resource status message comes from the ResourceManager layer, telling me what it thinks
+ * about the state of the resource executing an Evaluator. This method simply passes the message
* off to the referenced EvaluatorManager
*
* @param resourceStatusEvent resource status message from the ResourceManager
@@ -61,7 +64,16 @@ public final class ResourceStatusHandler implements EventHandler<ResourceStatusE
final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(resourceStatusEvent.getIdentifier());
if (evaluatorManager.isPresent()) {
evaluatorManager.get().onResourceStatusMessage(resourceStatusEvent);
+
+ if (evaluatorManager.get().isClosed()) {
+ this.evaluators.removeClosedEvaluator(evaluatorManager.get());
+ }
} else {
+ if (this.evaluators.wasClosed(resourceStatusEvent.getIdentifier())) {
+ LOG.log(Level.WARNING, "Unexpected resource status from closed evaluator " +
+ resourceStatusEvent.getIdentifier() + " with state " + resourceStatusEvent.getState());
+ }
+
if (driverRestartManager.get().getEvaluatorRestartState(resourceStatusEvent.getIdentifier())
.isFailedOrExpired()) {
final EvaluatorManager previousEvaluatorManager = this.evaluatorManagerFactory