You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/05/27 09:57:39 UTC

[2/3] flink git commit: [FLINK-2079] Add TaskManager deathwatch thread for YARN case

[FLINK-2079] Add TaskManager deathwatch thread for YARN case


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11b021b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11b021b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11b021b0

Branch: refs/heads/master
Commit: 11b021b0fb36503c06596323b39d531225057f1e
Parents: b2b0fe7
Author: Robert Metzger <rm...@apache.org>
Authored: Fri May 22 13:51:02 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed May 27 09:56:54 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/configuration/ConfigConstants.java     | 5 +++++
 .../org/apache/flink/runtime/process/ProcessReaper.java     | 2 +-
 .../scala/org/apache/flink/runtime/akka/AkkaUtils.scala     | 9 ++++++++-
 .../org/apache/flink/runtime/taskmanager/TaskManager.scala  | 5 ++---
 .../apache/flink/yarn/appMaster/YarnTaskManagerRunner.java  | 3 +++
 5 files changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/11b021b0/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 26d4fbe..92acd3f 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -406,6 +406,11 @@ public final class ConfigConstants {
 	 * Timeout for all blocking calls that look up remote actors
 	 */
 	public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout";
+
+	/**
+	 * Exit JVM on fatal Akka errors
+	 */
+	public static final String AKKA_JVM_EXIT_ON_FATAL_ERROR = "akka.jvm-exit-on-fatal-error";
 	
 	// ----------------------------- Streaming --------------------------------
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/11b021b0/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
index 644d7b7..09e1839 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
@@ -56,7 +56,7 @@ public class ProcessReaper extends UntypedActor {
 						Thread.sleep(100);
 					}
 					catch (InterruptedException e) {
-						// not really problem if we don't sleep...
+						// not really a problem if we don't sleep...
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/11b021b0/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 5b33017..7ffaddd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -139,6 +139,13 @@ object AkkaUtils {
     val lifecycleEvents = configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS,
       ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS)
 
+    val jvmExitOnFatalError = if (
+      configuration.getBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, false)){
+      "on"
+    } else {
+      "off"
+    }
+
     val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
 
     val logLevel = getLogLevel
@@ -152,7 +159,7 @@ object AkkaUtils {
         | logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
         | log-config-on-start = off
         |
-        | jvm-exit-on-fatal-error = off
+        | jvm-exit-on-fatal-error = $jvmExitOnFatalError
         |
         | serialize-messages = off
         |

http://git-wip-us.apache.org/repos/asf/flink/blob/11b021b0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8a45fa4..7bf5bc5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -158,7 +158,6 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
   private var heartbeatScheduler: Option[Cancellable] = None
 
-
   // --------------------------------------------------------------------------
   //  Actor messages and life cycle
   // --------------------------------------------------------------------------
@@ -192,7 +191,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
    */
   override def postStop(): Unit = {
     log.info(s"Stopping TaskManager ${self.path.toSerializationFormat}.")
-
+    
     cancelAndClearEverything(new Exception("TaskManager is shutting down."))
 
     if (isConnected) {
@@ -1289,7 +1288,7 @@ object TaskManager {
                                                            streamingMode,
                                                            taskManagerClass)
 
-      // start a process reaper that watches the JobManager. If the JobManager actor dies,
+      // start a process reaper that watches the JobManager. If the TaskManager actor dies,
       // the process reaper will kill the JVM process (to ensure easy failure detection)
       LOG.debug("Starting TaskManager process reaper")
       taskManagerSystem.actorOf(

http://git-wip-us.apache.org/repos/asf/flink/blob/11b021b0/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
index 564a0bd..3f13990 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
@@ -85,6 +85,9 @@ public class YarnTaskManagerRunner {
 		LOG.info("YARN daemon runs as '" + UserGroupInformation.getCurrentUser().getShortUserName()
 				+"' setting user to execute Flink TaskManager to '"+yarnClientUsername+"'");
 
+		// tell akka to die in case of an error
+		configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
+
 		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
 		for (Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
 			ugi.addToken(toks);