You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/12 17:46:32 UTC

flink git commit: [FLINK-2348] [taskmanager] Make async call dispatching robust against concurrent finishing.

Repository: flink
Updated Branches:
  refs/heads/master 9350264bd -> d0ecb9170


[FLINK-2348] [taskmanager] Make async call dispatching robust against concurrent finishing.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0ecb917
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0ecb917
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0ecb917

Branch: refs/heads/master
Commit: d0ecb9170e5edef48c7efd95764eeec7dbdf51a8
Parents: 9350264
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 12 16:19:30 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 12 16:25:16 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/runtime/taskmanager/Task.java  | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d0ecb917/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index d9168e3..1b2fb08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -982,7 +982,7 @@ public class Task implements Runnable {
 	private void executeAsyncCallRunnable(Runnable runnable, String callName) {
 		// make sure the executor is initialized. lock against concurrent calls to this function
 		synchronized (this) {
-			if (isCanceledOrFailed()) {
+			if (executionState != ExecutionState.RUNNING) {
 				return;
 			}
 			
@@ -996,7 +996,7 @@ public class Task implements Runnable {
 				
 				// double-check for execution state, and make sure we clean up after ourselves
 				// if we created the dispatcher while the task was concurrently canceled
-				if (isCanceledOrFailed()) {
+				if (executionState != ExecutionState.RUNNING) {
 					executor.shutdown();
 					asyncCallDispatcher = null;
 					return;
@@ -1009,9 +1009,10 @@ public class Task implements Runnable {
 				executor.submit(runnable);
 			}
 			catch (RejectedExecutionException e) {
-				// may be that we are concurrently canceled. if not, report that something is fishy
-				if (!isCanceledOrFailed()) {
-					throw new RuntimeException("Async call was rejected, even though the task was not canceled.", e);
+				// may be that we are concurrently finished or canceled.
+				// if not, report that something is fishy
+				if (executionState == ExecutionState.RUNNING) {
+					throw new RuntimeException("Async call was rejected, even though the task is running.", e);
 				}
 			}
 		}