You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/12 17:46:32 UTC
flink git commit: [FLINK-2348] [taskmanager] Make async call
dispatching robust against concurrent finishing.
Repository: flink
Updated Branches:
refs/heads/master 9350264bd -> d0ecb9170
[FLINK-2348] [taskmanager] Make async call dispatching robust against concurrent finishing.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0ecb917
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0ecb917
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0ecb917
Branch: refs/heads/master
Commit: d0ecb9170e5edef48c7efd95764eeec7dbdf51a8
Parents: 9350264
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 12 16:19:30 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 12 16:25:16 2015 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/runtime/taskmanager/Task.java | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d0ecb917/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index d9168e3..1b2fb08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -982,7 +982,7 @@ public class Task implements Runnable {
private void executeAsyncCallRunnable(Runnable runnable, String callName) {
// make sure the executor is initialized. lock against concurrent calls to this function
synchronized (this) {
- if (isCanceledOrFailed()) {
+ if (executionState != ExecutionState.RUNNING) {
return;
}
@@ -996,7 +996,7 @@ public class Task implements Runnable {
// double-check for execution state, and make sure we clean up after ourselves
// if we created the dispatcher while the task was concurrently canceled
- if (isCanceledOrFailed()) {
+ if (executionState != ExecutionState.RUNNING) {
executor.shutdown();
asyncCallDispatcher = null;
return;
@@ -1009,9 +1009,10 @@ public class Task implements Runnable {
executor.submit(runnable);
}
catch (RejectedExecutionException e) {
- // may be that we are concurrently canceled. if not, report that something is fishy
- if (!isCanceledOrFailed()) {
- throw new RuntimeException("Async call was rejected, even though the task was not canceled.", e);
+ // may be that we are concurrently finished or canceled.
+ // if not, report that something is fishy
+ if (executionState == ExecutionState.RUNNING) {
+ throw new RuntimeException("Async call was rejected, even though the task is running.", e);
}
}
}