You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/29 18:36:58 UTC
[4/4] flink git commit: [FLINK-1796] [jobmanager] In local mode,
the embedded TaskManager is watched by a process reaper as well.
[FLINK-1796] [jobmanager] In local mode, the embedded TaskManager is watched by a process reaper as well.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c321425
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c321425
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c321425
Branch: refs/heads/master
Commit: 8c32142528590a030693529c7c8d93f194968c0a
Parents: d6ea1f2
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Mar 27 20:26:33 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Mar 29 18:34:32 2015 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/runtime/process/ProcessReaper.java | 2 +-
.../org/apache/flink/runtime/jobmanager/JobManager.scala | 8 +++++++-
2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8c321425/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
index b12b82d..5ab550f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
@@ -47,7 +47,7 @@ public class ProcessReaper extends UntypedActor {
if (message instanceof Terminated) {
try {
Terminated term = (Terminated) message;
- String name = term.actor().path().name();
+ String name = term.actor().path().toSerializationFormat();
if (log != null) {
log.error("Actor " + name + " terminated, stopping process...");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c321425/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2233dbf..9aa476d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -769,8 +769,14 @@ object JobManager {
if (executionMode == JobManagerMode.LOCAL) {
LOG.info("Starting embedded TaskManager for JobManager's LOCAL execution mode")
- TaskManager.startTaskManagerActor(configuration, jobManagerSystem, listeningAddress,
+ val taskManagerActor = TaskManager.startTaskManagerActor(
+ configuration, jobManagerSystem, listeningAddress,
TaskManager.TASK_MANAGER_NAME, true, true, classOf[TaskManager])
+
+ LOG.debug("Starting TaskManager process reaper")
+ jobManagerSystem.actorOf(
+ Props(classOf[ProcessReaper], taskManagerActor, LOG, RUNTIME_FAILURE_RETURN_CODE),
+ "TaskManager_Process_Reaper")
}
// start the job manager web frontend