You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/26 12:11:42 UTC

[2/2] flink git commit: [FLINK-4672] [taskmanager] Do not decorate Actor Kill messages

[FLINK-4672] [taskmanager] Do not decorate Actor Kill messages


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f237cfe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f237cfe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f237cfe

Branch: refs/heads/master
Commit: 6f237cfe6f70b5b72fedd3dea6fbeb6c929631e8
Parents: 28ff5a3
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Sep 23 18:42:47 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 26 14:11:05 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/taskmanager/TaskManager.scala |  2 +-
 .../runtime/taskmanager/TaskManagerTest.java    | 33 ++++++++++++++++++--
 2 files changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6f237cfe/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 9e2feb5..04f3168 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1379,7 +1379,7 @@ class TaskManager(
       "\n" +
       "A fatal error occurred, forcing the TaskManager to shut down: " + message, cause)
 
-    self ! decorateMessage(Kill)
+    self ! Kill
   }
 
   override def notifyLeaderAddress(leaderAddress: String, leaderSessionID: UUID): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/6f237cfe/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 0e53673..0774fd5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.Kill;
 import akka.actor.Props;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
@@ -55,6 +56,7 @@ import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackT
 import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleSuccess;
 import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
 import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
 import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
@@ -1367,6 +1369,28 @@ public class TaskManagerTest extends TestLogger {
 		}};
 	}
 
+	@Test
+	public void testTerminationOnFatalError() {
+		new JavaTestKit(system){{
+
+			final ActorGateway taskManager = TestingUtils.createTaskManager(
+					system,
+					system.deadLetters(), // no jobmanager
+					new Configuration(),
+					true,
+					false);
+
+			try {
+				watch(taskManager.actor());
+				taskManager.tell(new FatalError("test fatal error", new Exception("something super bad")));
+				expectTerminated(d, taskManager.actor());
+			}
+			finally {
+				taskManager.tell(Kill.getInstance());
+			}
+		}};
+	}
+	
 	// --------------------------------------------------------------------------------------------
 
 	public static class SimpleJobManager extends FlinkUntypedActor {
@@ -1547,11 +1571,14 @@ public class TaskManagerTest extends TestLogger {
 
 		@Override
 		public void invoke() throws Exception {
-			Object o = new Object();
+			final Object o = new Object();
+			//noinspection SynchronizationOnLocalVariableOrMethodParameter
 			synchronized (o) {
-				o.wait();
+				//noinspection InfiniteLoopStatement
+				while (true) {
+					o.wait();
+				}
 			}
 		}
 	}
-
 }