You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/26 09:25:24 UTC
flink git commit: [FLINK-4672] [taskmanager] Do not decorate Actor
Kill messages
Repository: flink
Updated Branches:
refs/heads/release-1.1 62c666f57 -> caa0fbb21
[FLINK-4672] [taskmanager] Do not decorate Actor Kill messages
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/caa0fbb2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/caa0fbb2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/caa0fbb2
Branch: refs/heads/release-1.1
Commit: caa0fbb2157de56c9bdc4bbf8aedb73df90edede
Parents: 62c666f
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Sep 23 18:42:47 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 26 11:25:01 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/taskmanager/TaskManager.scala | 2 +-
.../runtime/taskmanager/TaskManagerTest.java | 33 ++++++++++++++++++--
2 files changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/caa0fbb2/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a7dd789..8e787bb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1333,7 +1333,7 @@ class TaskManager(
"\n" +
"A fatal error occurred, forcing the TaskManager to shut down: " + message, cause)
- self ! decorateMessage(Kill)
+ self ! Kill
}
override def notifyLeaderAddress(leaderAddress: String, leaderSessionID: UUID): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/caa0fbb2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index ce88c09..1c50265 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.actor.Kill;
import akka.actor.Props;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
@@ -55,6 +56,7 @@ import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackT
import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleSuccess;
import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
@@ -1369,6 +1371,28 @@ public class TaskManagerTest extends TestLogger {
}};
}
+ @Test
+ public void testTerminationOnFatalError() {
+ new JavaTestKit(system){{
+
+ final ActorGateway taskManager = TestingUtils.createTaskManager(
+ system,
+ system.deadLetters(), // no jobmanager
+ new Configuration(),
+ true,
+ false);
+
+ try {
+ watch(taskManager.actor());
+ taskManager.tell(new FatalError("test fatal error", new Exception("something super bad")));
+ expectTerminated(d, taskManager.actor());
+ }
+ finally {
+ taskManager.tell(Kill.getInstance());
+ }
+ }};
+ }
+
// --------------------------------------------------------------------------------------------
public static class SimpleJobManager extends FlinkUntypedActor {
@@ -1549,11 +1573,14 @@ public class TaskManagerTest extends TestLogger {
@Override
public void invoke() throws Exception {
- Object o = new Object();
+ final Object o = new Object();
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (o) {
- o.wait();
+ //noinspection InfiniteLoopStatement
+ while (true) {
+ o.wait();
+ }
}
}
}
-
}