You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/05/27 09:57:39 UTC
[2/3] flink git commit: [FLINK-2079] Add TaskManager deathwatch
thread for YARN case
[FLINK-2079] Add TaskManager deathwatch thread for YARN case
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11b021b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11b021b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11b021b0
Branch: refs/heads/master
Commit: 11b021b0fb36503c06596323b39d531225057f1e
Parents: b2b0fe7
Author: Robert Metzger <rm...@apache.org>
Authored: Fri May 22 13:51:02 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed May 27 09:56:54 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/configuration/ConfigConstants.java | 5 +++++
.../org/apache/flink/runtime/process/ProcessReaper.java | 2 +-
.../scala/org/apache/flink/runtime/akka/AkkaUtils.scala | 9 ++++++++-
.../org/apache/flink/runtime/taskmanager/TaskManager.scala | 5 ++---
.../apache/flink/yarn/appMaster/YarnTaskManagerRunner.java | 3 +++
5 files changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/11b021b0/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 26d4fbe..92acd3f 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -406,6 +406,11 @@ public final class ConfigConstants {
* Timeout for all blocking calls that look up remote actors
*/
public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout";
+
+ /**
+ * Exit JVM on fatal Akka errors
+ */
+ public static final String AKKA_JVM_EXIT_ON_FATAL_ERROR = "akka.jvm-exit-on-fatal-error";
// ----------------------------- Streaming --------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/11b021b0/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
index 644d7b7..09e1839 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
@@ -56,7 +56,7 @@ public class ProcessReaper extends UntypedActor {
Thread.sleep(100);
}
catch (InterruptedException e) {
- // not really problem if we don't sleep...
+ // not really a problem if we don't sleep...
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/11b021b0/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 5b33017..7ffaddd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -139,6 +139,13 @@ object AkkaUtils {
val lifecycleEvents = configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS,
ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS)
+ val jvmExitOnFatalError = if (
+ configuration.getBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, false)){
+ "on"
+ } else {
+ "off"
+ }
+
val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
val logLevel = getLogLevel
@@ -152,7 +159,7 @@ object AkkaUtils {
| logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
| log-config-on-start = off
|
- | jvm-exit-on-fatal-error = off
+ | jvm-exit-on-fatal-error = $jvmExitOnFatalError
|
| serialize-messages = off
|
http://git-wip-us.apache.org/repos/asf/flink/blob/11b021b0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8a45fa4..7bf5bc5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -158,7 +158,6 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
private var heartbeatScheduler: Option[Cancellable] = None
-
// --------------------------------------------------------------------------
// Actor messages and life cycle
// --------------------------------------------------------------------------
@@ -192,7 +191,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
*/
override def postStop(): Unit = {
log.info(s"Stopping TaskManager ${self.path.toSerializationFormat}.")
-
+
cancelAndClearEverything(new Exception("TaskManager is shutting down."))
if (isConnected) {
@@ -1289,7 +1288,7 @@ object TaskManager {
streamingMode,
taskManagerClass)
- // start a process reaper that watches the JobManager. If the JobManager actor dies,
+ // start a process reaper that watches the JobManager. If the TaskManager actor dies,
// the process reaper will kill the JVM process (to ensure easy failure detection)
LOG.debug("Starting TaskManager process reaper")
taskManagerSystem.actorOf(
http://git-wip-us.apache.org/repos/asf/flink/blob/11b021b0/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
index 564a0bd..3f13990 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
@@ -85,6 +85,9 @@ public class YarnTaskManagerRunner {
LOG.info("YARN daemon runs as '" + UserGroupInformation.getCurrentUser().getShortUserName()
+"' setting user to execute Flink TaskManager to '"+yarnClientUsername+"'");
+ // tell akka to die in case of an error
+ configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
+
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
for (Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
ugi.addToken(toks);