You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/05/29 10:56:32 UTC
flink git commit: [FLINK-2109] [runtime] Fix CancelTaskException
handling
Repository: flink
Updated Branches:
refs/heads/master 4c9f16116 -> d594d0242
[FLINK-2109] [runtime] Fix CancelTaskException handling
This closes #745.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d594d024
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d594d024
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d594d024
Branch: refs/heads/master
Commit: d594d02426898c3bf3c1fd52417c75921e28b61c
Parents: 4c9f161
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu May 28 19:40:08 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri May 29 10:55:19 2015 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/taskmanager/Task.java | 47 ++++++++++------
.../flink/runtime/taskmanager/TaskTest.java | 57 +++++++++++++++++++-
2 files changed, 87 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d594d024/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 40198dc..6250837 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -537,7 +537,7 @@ public class Task implements Runnable {
// actual task core work
// ----------------------------------------------------------------
- // we must make strictly sure that the invokable is accessible to teh cancel() call
+ // we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
this.invokable = invokable;
@@ -597,22 +597,25 @@ public class Task implements Runnable {
// to failExternally()
while (true) {
ExecutionState current = this.executionState;
+
if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
- if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
- // proper failure of the task. record the exception as the root cause
- failureCause = t;
- notifyObservers(ExecutionState.FAILED, t);
-
- // in case of an exception during execution, we still call "cancel()" on the task
- if (invokable != null && this.invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
- try {
- invokable.cancel();
- }
- catch (Throwable t2) {
- LOG.error("Error while canceling task " + taskNameWithSubtask, t2);
- }
+ if (t instanceof CancelTaskException) {
+ if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
+ cancelInvokable();
+
+ notifyObservers(ExecutionState.CANCELED, null);
+ break;
+ }
+ }
+ else {
+ if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+ // proper failure of the task. record the exception as the root cause
+ failureCause = t;
+ cancelInvokable();
+
+ notifyObservers(ExecutionState.FAILED, t);
+ break;
}
- break;
}
}
else if (current == ExecutionState.CANCELING) {
@@ -746,7 +749,7 @@ public class Task implements Runnable {
}
/**
- * Marks task execution failed for an external reason (a reason other than th task code itself
+ * Marks task execution failed for an external reason (a reason other than the task code itself
* throwing an exception). If the task is already in a terminal state
* (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing.
* Otherwise it sets the state to FAILED, and, if the invokable code is running,
@@ -962,6 +965,18 @@ public class Task implements Runnable {
// Utilities
// ------------------------------------------------------------------------
+ private void cancelInvokable() {
+ // in case of an exception during execution, we still call "cancel()" on the task
+ if (invokable != null && this.invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
+ try {
+ invokable.cancel();
+ }
+ catch (Throwable t) {
+ LOG.error("Error while canceling task " + taskNameWithSubtask, t);
+ }
+ }
+ }
+
@Override
public String toString() {
return getTaskNameWithSubtasks() + " [" + executionState + ']';
http://git-wip-us.apache.org/repos/asf/flink/blob/d594d024/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index bcc7ffe..e9e761c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -605,6 +606,41 @@ public class TaskTest {
}
@Test
+ public void testCancelTaskException() throws Exception {
+ final Task task = createTask(InvokableWithCancelTaskExceptionInInvoke.class);
+
+ // Cause CancelTaskException.
+ triggerLatch.trigger();
+
+ task.run();
+
+ assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+ }
+
+ @Test
+ public void testCancelTaskExceptionAfterTaskMarkedFailed() throws Exception {
+ final Task task = createTask(InvokableWithCancelTaskExceptionInInvoke.class);
+
+ task.startTaskThread();
+
+ // Wait till the task is in invoke.
+ awaitLatch.await();
+
+ task.failExternally(new Exception("external"));
+ assertEquals(ExecutionState.FAILED, task.getExecutionState());
+
+ // Either we cause the CancelTaskException or the TaskCanceler
+ // by interrupting the invokable.
+ triggerLatch.trigger();
+
+ task.getExecutingThread().join();
+
+ assertEquals(ExecutionState.FAILED, task.getExecutionState());
+ assertTrue(task.isCanceledOrFailed());
+ assertTrue(task.getFailureCause().getMessage().contains("external"));
+ }
+
+ @Test
public void testOnPartitionStateUpdate() throws Exception {
IntermediateDataSetID resultId = new IntermediateDataSetID();
ResultPartitionID partitionId = new ResultPartitionID();
@@ -900,7 +936,7 @@ public class TaskTest {
// fall through the loop
}
}
-
+
throw new RuntimeException("test");
}
}
@@ -940,4 +976,23 @@ public class TaskTest {
}
}
}
+
+ public static final class InvokableWithCancelTaskExceptionInInvoke extends AbstractInvokable {
+
+ @Override
+ public void registerInputOutput() {
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ awaitLatch.trigger();
+
+ try {
+ triggerLatch.await();
+ }
+ finally {
+ throw new CancelTaskException();
+ }
+ }
+ }
}