You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/05/29 10:56:32 UTC

flink git commit: [FLINK-2109] [runtime] Fix CancelTaskException handling

Repository: flink
Updated Branches:
  refs/heads/master 4c9f16116 -> d594d0242


[FLINK-2109] [runtime] Fix CancelTaskException handling

This closes #745.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d594d024
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d594d024
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d594d024

Branch: refs/heads/master
Commit: d594d02426898c3bf3c1fd52417c75921e28b61c
Parents: 4c9f161
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu May 28 19:40:08 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri May 29 10:55:19 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/taskmanager/Task.java  | 47 ++++++++++------
 .../flink/runtime/taskmanager/TaskTest.java     | 57 +++++++++++++++++++-
 2 files changed, 87 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d594d024/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 40198dc..6250837 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -537,7 +537,7 @@ public class Task implements Runnable {
 			//  actual task core work
 			// ----------------------------------------------------------------
 
-			// we must make strictly sure that the invokable is accessible to teh cancel() call
+			// we must make strictly sure that the invokable is accessible to the cancel() call
 			// by the time we switched to running.
 			this.invokable = invokable;
 
@@ -597,22 +597,25 @@ public class Task implements Runnable {
 				// to failExternally()
 				while (true) {
 					ExecutionState current = this.executionState;
+
 					if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
-						if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
-							// proper failure of the task. record the exception as the root cause
-							failureCause = t;
-							notifyObservers(ExecutionState.FAILED, t);
-
-							// in case of an exception during execution, we still call "cancel()" on the task
-							if (invokable != null && this.invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
-								try {
-									invokable.cancel();
-								}
-								catch (Throwable t2) {
-									LOG.error("Error while canceling task " + taskNameWithSubtask, t2);
-								}
+						if (t instanceof CancelTaskException) {
+							if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
+								cancelInvokable();
+
+								notifyObservers(ExecutionState.CANCELED, null);
+								break;
+							}
+						}
+						else {
+							if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+								// proper failure of the task. record the exception as the root cause
+								failureCause = t;
+								cancelInvokable();
+
+								notifyObservers(ExecutionState.FAILED, t);
+								break;
 							}
-							break;
 						}
 					}
 					else if (current == ExecutionState.CANCELING) {
@@ -746,7 +749,7 @@ public class Task implements Runnable {
 	}
 
 	/**
-	 * Marks task execution failed for an external reason (a reason other than th task code itself
+	 * Marks task execution failed for an external reason (a reason other than the task code itself
 	 * throwing an exception). If the task is already in a terminal state
 	 * (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing.
 	 * Otherwise it sets the state to FAILED, and, if the invokable code is running,
@@ -962,6 +965,18 @@ public class Task implements Runnable {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
+	private void cancelInvokable() {
+		// in case of an exception during execution, we still call "cancel()" on the task
+		if (invokable != null && this.invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
+			try {
+				invokable.cancel();
+			}
+			catch (Throwable t) {
+				LOG.error("Error while canceling task " + taskNameWithSubtask, t);
+			}
+		}
+	}
+
 	@Override
 	public String toString() {
 		return getTaskNameWithSubtasks() + " [" + executionState + ']';

http://git-wip-us.apache.org/repos/asf/flink/blob/d594d024/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index bcc7ffe..e9e761c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -605,6 +606,41 @@ public class TaskTest {
 	}
 
 	@Test
+	public void testCancelTaskException() throws Exception {
+		final Task task = createTask(InvokableWithCancelTaskExceptionInInvoke.class);
+
+		// Cause CancelTaskException.
+		triggerLatch.trigger();
+
+		task.run();
+
+		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+	}
+
+	@Test
+	public void testCancelTaskExceptionAfterTaskMarkedFailed() throws Exception {
+		final Task task = createTask(InvokableWithCancelTaskExceptionInInvoke.class);
+
+		task.startTaskThread();
+
+		// Wait till the task is in invoke.
+		awaitLatch.await();
+
+		task.failExternally(new Exception("external"));
+		assertEquals(ExecutionState.FAILED, task.getExecutionState());
+
+		// Either we cause the CancelTaskException or the TaskCanceler
+		// by interrupting the invokable.
+		triggerLatch.trigger();
+
+		task.getExecutingThread().join();
+
+		assertEquals(ExecutionState.FAILED, task.getExecutionState());
+		assertTrue(task.isCanceledOrFailed());
+		assertTrue(task.getFailureCause().getMessage().contains("external"));
+	}
+
+	@Test
 	public void testOnPartitionStateUpdate() throws Exception {
 		IntermediateDataSetID resultId = new IntermediateDataSetID();
 		ResultPartitionID partitionId = new ResultPartitionID();
@@ -900,7 +936,7 @@ public class TaskTest {
 					// fall through the loop
 				}
 			}
-			
+
 			throw new RuntimeException("test");
 		}
 	}
@@ -940,4 +976,23 @@ public class TaskTest {
 			}
 		}
 	}
+
+	public static final class InvokableWithCancelTaskExceptionInInvoke extends AbstractInvokable {
+
+		@Override
+		public void registerInputOutput() {
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			awaitLatch.trigger();
+
+			try {
+				triggerLatch.await();
+			}
+			finally {
+				throw new CancelTaskException();
+			}
+		}
+	}
 }