You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/05/11 17:24:20 UTC

reef git commit: REEF-1250] Fix memory leak in Evaluators

Repository: reef
Updated Branches:
  refs/heads/master c67391a1d -> ac6785db4


REEF-1250] Fix memory leak in Evaluators

This change:
 * introduces a set of closed evaluator ids,
 * moves evaluator to this set whenever resource manager
   sends resource status indicating that evaluator is closed.

JIRA:
  [REEF-1250](https://issues.apache.org/jira/browse/REEF-1250)

Pull request:
  This closes #995


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/ac6785db
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/ac6785db
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/ac6785db

Branch: refs/heads/master
Commit: ac6785db44d9a1aecb819eba96558efdaae4e570
Parents: c67391a
Author: Mariia Mykhailova <ma...@apache.org>
Authored: Mon May 9 15:35:24 2016 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Wed May 11 10:21:54 2016 -0700

----------------------------------------------------------------------
 .../evaluator/EvaluatorHeartbeatHandler.java    |  5 +++
 .../EvaluatorResourceManagerErrorHandler.java   |  6 ++-
 .../common/driver/evaluator/Evaluators.java     | 44 +++++++++++++++++---
 .../resourcemanager/ResourceStatusHandler.java  | 16 ++++++-
 4 files changed, 63 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/ac6785db/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
index 9011d7f..aedbc5a 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
@@ -71,6 +71,11 @@ public final class EvaluatorHeartbeatHandler
         return;
       }
 
+      if (this.evaluators.wasClosed(evaluatorId)) {
+        LOG.log(Level.WARNING, "Evaluator [" + evaluatorId + "] has reported back to the driver after it was closed.");
+        return;
+      }
+
       if (driverRestartManager.isRestarting() &&
           driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.EXPECTED) {
 

http://git-wip-us.apache.org/repos/asf/reef/blob/ac6785db/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java
index a55fec3..3a188e0 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java
@@ -62,7 +62,11 @@ public final class EvaluatorResourceManagerErrorHandler
     if (evaluatorManager.isPresent()) {
       evaluatorManager.get().onEvaluatorException(evaluatorException);
     } else {
-      LOG.log(Level.WARNING, "Unknown evaluator runtime error: " + error);
+      if (this.evaluators.wasClosed(evaluatorId)) {
+        LOG.log(Level.WARNING, "Evaluator [" + evaluatorId + "] has raised exception after it was closed.");
+      } else {
+        LOG.log(Level.WARNING, "Unknown evaluator runtime error: " + error);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/ac6785db/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
index 59ee6a8..5bc0645 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
@@ -23,12 +23,10 @@ import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
 import org.apache.reef.util.Optional;
 import org.apache.reef.util.SingletonAsserter;
+import org.apache.reef.tang.util.MonotonicSet;
 
 import javax.inject.Inject;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -47,6 +45,10 @@ public final class Evaluators implements AutoCloseable {
    */
   private final Map<String, EvaluatorManager> evaluators = new HashMap<>();
 
+  /**
+   * A set of evaluatorIds for "closed" (failed and returned) evaluators.
+   */
+  private final MonotonicSet<String> closedEvaluatorIds = new MonotonicSet<>();
 
   @Inject
   Evaluators() {
@@ -95,6 +97,14 @@ public final class Evaluators implements AutoCloseable {
   }
 
   /**
+   * @param evaluatorId
+   * @return true if evaluator with this id has already been closed.
+   */
+  public synchronized boolean wasClosed(final String evaluatorId) {
+    return this.closedEvaluatorIds.contains(evaluatorId);
+  }
+
+  /**
    * Create new EvaluatorManager and add it to the collection.
    * <p>
    * FIXME: This method is a temporary fix for the race condition
@@ -118,11 +128,35 @@ public final class Evaluators implements AutoCloseable {
    */
   public synchronized void put(final EvaluatorManager evaluatorManager) {
     final String evaluatorId = evaluatorManager.getId();
+    if (this.wasClosed(evaluatorId)) {
+      throw new IllegalArgumentException(
+        "Trying to re-add an Evaluator that has already been closed: " + evaluatorId);
+    }
     final EvaluatorManager prev = this.evaluators.put(evaluatorId, evaluatorManager);
     LOG.log(Level.FINEST, "Adding: {0} previous: {1}", new Object[]{evaluatorId, prev});
     if (prev != null) {
       throw new IllegalArgumentException(
-          "Trying to re-add an Evaluator that is already known: " + evaluatorId);
+        "Trying to re-add an Evaluator that is already known: " + evaluatorId);
+    }
+  }
+
+  /**
+   * Moves evaluator from map of active evaluators to set of closed evaluators.
+   */
+  public synchronized void removeClosedEvaluator(final EvaluatorManager evaluatorManager) {
+    final String evaluatorId = evaluatorManager.getId();
+    if (!evaluatorManager.isClosed()) {
+      throw new IllegalArgumentException("Trying to remove evaluator " + evaluatorId + " which is not closed yet.");
+    }
+    if (!this.evaluators.containsKey(evaluatorId) && !this.closedEvaluatorIds.contains(evaluatorId)) {
+      throw new IllegalArgumentException("Trying to remove unknown evaluator " + evaluatorId + ".");
+    }
+    if (!this.evaluators.containsKey(evaluatorId) && this.closedEvaluatorIds.contains(evaluatorId)) {
+      LOG.log(Level.FINE, "Trying to remove closed evaluator " + evaluatorId + " which has already been removed.");
+    } else {
+      LOG.log(Level.FINE, "Removing closed evaluator " + evaluatorId + ".");
+      this.evaluators.remove(evaluatorId);
+      this.closedEvaluatorIds.add(evaluatorId);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/ac6785db/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
index 1a2cb38..50ee841 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
@@ -28,6 +28,8 @@ import org.apache.reef.util.Optional;
 import org.apache.reef.wake.EventHandler;
 
 import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 /**
  * A ResourceStatusProto message comes from the ResourceManager layer to indicate what it thinks
@@ -35,6 +37,7 @@ import javax.inject.Inject;
  */
 @Private
 public final class ResourceStatusHandler implements EventHandler<ResourceStatusEvent> {
+  private static final Logger LOG = Logger.getLogger(Evaluators.class.getName());
 
   private final Evaluators evaluators;
   private final EvaluatorManagerFactory evaluatorManagerFactory;
@@ -50,8 +53,8 @@ public final class ResourceStatusHandler implements EventHandler<ResourceStatusE
   }
 
   /**
-   * This resource status message comes from the ResourceManager layer; telling me what it thinks.
-   * about the state of the resource executing an Evaluator; This method simply passes the message
+   * This resource status message comes from the ResourceManager layer, telling me what it thinks
+   * about the state of the resource executing an Evaluator. This method simply passes the message
    * off to the referenced EvaluatorManager
    *
    * @param resourceStatusEvent resource status message from the ResourceManager
@@ -61,7 +64,16 @@ public final class ResourceStatusHandler implements EventHandler<ResourceStatusE
     final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(resourceStatusEvent.getIdentifier());
     if (evaluatorManager.isPresent()) {
       evaluatorManager.get().onResourceStatusMessage(resourceStatusEvent);
+
+      if (evaluatorManager.get().isClosed()) {
+        this.evaluators.removeClosedEvaluator(evaluatorManager.get());
+      }
     } else {
+      if (this.evaluators.wasClosed(resourceStatusEvent.getIdentifier())) {
+        LOG.log(Level.WARNING, "Unexpected resource status from closed evaluator " +
+            resourceStatusEvent.getIdentifier() + " with state " + resourceStatusEvent.getState());
+      }
+
       if (driverRestartManager.get().getEvaluatorRestartState(resourceStatusEvent.getIdentifier())
             .isFailedOrExpired()) {
         final EvaluatorManager previousEvaluatorManager = this.evaluatorManagerFactory