You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/11/16 22:33:18 UTC

incubator-reef git commit: [REEF-935] Close EvaluatorMessageDispatcher upon Evaluator completion and failure

Repository: incubator-reef
Updated Branches:
  refs/heads/master d55433956 -> 885046818


[REEF-935] Close EvaluatorMessageDispatcher upon Evaluator completion and failure

This addressed the issue by
  * Making EvaluatorMessageDispatcher AutoCloseable
  * Closing the dispatcher in appropriate methods of EvaluatorManager

JIRA:
  [REEF-935](https://issues.apache.org/jira/browse/REEF-935)

Pull Request:
  This closes #629


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/88504681
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/88504681
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/88504681

Branch: refs/heads/master
Commit: 885046818f9937dc0ced221e7ef8099088f6e90a
Parents: d554339
Author: John Yang <jo...@apache.org>
Authored: Wed Nov 11 20:17:33 2015 +0900
Committer: Markus Weimer <we...@apache.org>
Committed: Mon Nov 16 13:30:36 2015 -0800

----------------------------------------------------------------------
 .../runtime/common/driver/evaluator/EvaluatorManager.java |  6 ++++++
 .../driver/evaluator/EvaluatorMessageDispatcher.java      | 10 +++++++++-
 2 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/88504681/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index 708cbdc..7a92dbc 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -253,6 +253,12 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
         }
       }
     }
+
+    try {
+      this.messageDispatcher.close();
+    } catch (Exception e) {
+      LOG.log(Level.SEVERE, "Exception while closing EvaluatorManager", e);
+    }
     this.idlenessSource.check();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/88504681/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
index aa5f43b..fc3a83b 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
@@ -41,7 +41,7 @@ import java.util.logging.Logger;
 /**
  * Central dispatcher for all Evaluator related events. This exists once per Evaluator.
  */
-public final class EvaluatorMessageDispatcher {
+public final class EvaluatorMessageDispatcher implements AutoCloseable {
 
   private static final Logger LOG = Logger.getLogger(EvaluatorMessageDispatcher.class.getName());
 
@@ -279,4 +279,12 @@ public final class EvaluatorMessageDispatcher {
     this.driverRestartServiceDispatcher.onNext(type, message);
     this.driverRestartApplicationDispatcher.onNext(type, message);
   }
+
+  @Override
+  public void close() throws Exception {
+    /**
+     * This effectively closes all dispatchers as they share the same stage.
+     */
+    this.serviceDispatcher.close();
+  }
 }