You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/29 18:36:58 UTC

[4/4] flink git commit: [FLINK-1796] [jobmanager] In local mode, the embedded TaskManager is watched by a process reaper as well.

[FLINK-1796] [jobmanager] In local mode, the embedded TaskManager is watched by a process reaper as well.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c321425
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c321425
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c321425

Branch: refs/heads/master
Commit: 8c32142528590a030693529c7c8d93f194968c0a
Parents: d6ea1f2
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Mar 27 20:26:33 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Mar 29 18:34:32 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/runtime/process/ProcessReaper.java | 2 +-
 .../org/apache/flink/runtime/jobmanager/JobManager.scala     | 8 +++++++-
 2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8c321425/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
index b12b82d..5ab550f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
@@ -47,7 +47,7 @@ public class ProcessReaper extends UntypedActor {
 		if (message instanceof Terminated) {
 			try {
 				Terminated term = (Terminated) message;
-				String name = term.actor().path().name();
+				String name = term.actor().path().toSerializationFormat();
 				if (log != null) {
 					log.error("Actor " + name + " terminated, stopping process...");
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c321425/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2233dbf..9aa476d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -769,8 +769,14 @@ object JobManager {
       if (executionMode == JobManagerMode.LOCAL) {
         LOG.info("Starting embedded TaskManager for JobManager's LOCAL execution mode")
 
-        TaskManager.startTaskManagerActor(configuration, jobManagerSystem, listeningAddress,
+        val taskManagerActor = TaskManager.startTaskManagerActor(
+          configuration, jobManagerSystem, listeningAddress,
           TaskManager.TASK_MANAGER_NAME, true, true, classOf[TaskManager])
+
+        LOG.debug("Starting TaskManager process reaper")
+        jobManagerSystem.actorOf(
+          Props(classOf[ProcessReaper], taskManagerActor, LOG, RUNTIME_FAILURE_RETURN_CODE),
+          "TaskManager_Process_Reaper")
       }
 
       // start the job manager web frontend