You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/07/30 17:54:35 UTC

flink git commit: [FLINK-7279] [minicluster] call fatal error handlers asynchronously

Repository: flink
Updated Branches:
  refs/heads/master a954ea113 -> 49acd09ec


[FLINK-7279] [minicluster] call fatal error handlers asynchronously

This fixes a deadlock between TM and cluster shutdown:

The MiniCluster can deadlock if the fatal error handler is called while the
MiniCluster shuts down. The reason is that the shut down happens under a lock
which is required by the fatal error handler as well. If now the MiniCluster
tries to shut down the underlying RPC service which waits for all actors to
terminate, it will never complete because one actor is still waiting for the
lock.

Solution: call fatal error handlers in TaskExecutor asynchronously so that the
RPC service does not get blocked on this. This is more future proof than fixing
the locking problem itself which may eventually re-occur.

This closes #4416.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49acd09e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49acd09e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49acd09e

Branch: refs/heads/master
Commit: 49acd09ec19b5da3f6e9861d658728ec1306e3d5
Parents: a954ea1
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Jul 28 17:59:03 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Jul 30 19:54:25 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/minicluster/MiniCluster.java |  3 ++-
 .../apache/flink/runtime/taskexecutor/TaskExecutor.java   | 10 ++++++++--
 2 files changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/49acd09e/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 8841f68..0fa8f8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -649,7 +649,8 @@ public class MiniCluster {
 
 			try {
 				synchronized (lock) {
-					if (taskManagers[index] != null) {
+					// note: if not running (after shutdown) taskManagers may be null!
+					if (running && taskManagers[index] != null) {
 						taskManagers[index].shutDown();
 					}
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/49acd09e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 13153c4..8038228 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -1124,9 +1124,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	 *
 	 * @param t The exception describing the fatal error
 	 */
-	void onFatalError(Throwable t) {
+	void onFatalError(final Throwable t) {
 		log.error("Fatal error occurred.", t);
-		fatalErrorHandler.onFatalError(t);
+		// this could potentially be a blocking call -> call asynchronously:
+		getRpcService().execute(new Runnable() {
+			@Override
+			public void run() {
+				fatalErrorHandler.onFatalError(t);
+			}
+		});
 	}
 
 	// ------------------------------------------------------------------------