You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/10/27 15:41:58 UTC

flink git commit: [FLINK-4715] Fail TaskManager with fatal error if task cancellation is stuck

Repository: flink
Updated Branches:
  refs/heads/release-1.1 529534f33 -> cc6655b7b


[FLINK-4715] Fail TaskManager with fatal error if task cancellation is stuck

- Splits the cancellation up into two threads:
  * The `TaskCanceler` calls `cancel` on the invokable and `interrupt`
    on the executing Thread. It then exists.
  * The `TaskCancellationWatchDog` kicks in after the task cancellation
    timeout (current default: 30 secs) and periodically calls `interrupt`
    on the executing Thread. If the Thread does not terminate within
    the task cancellation timeout (new config value, default 3 mins), the task
    manager is notified about a fatal error, leading to termination of the JVM.
- The new configuration is exposed via `ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS`
(default: 3 mins) and the `ExecutionConfig` (similar to the cancellation interval).

Backported with slight adjustments from the master branch.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc6655b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc6655b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc6655b7

Branch: refs/heads/release-1.1
Commit: cc6655b7b4518a105971c7965d313a7aefcfc613
Parents: 529534f
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Oct 18 09:50:36 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Oct 27 17:41:44 2016 +0200

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       |  46 +++-
 .../flink/configuration/ConfigConstants.java    |  14 +
 .../apache/flink/runtime/taskmanager/Task.java  | 157 +++++++++--
 ...TaskManagerProcessReapingFatalErrorTest.java |  40 +++
 .../TaskManagerProcessReapingTest.java          | 241 +---------------
 .../TaskManagerProcessReapingTestBase.java      | 275 +++++++++++++++++++
 .../flink/runtime/taskmanager/TaskTest.java     | 247 ++++++++++++++++-
 .../flink/core/testutils/OneShotLatch.java      |   9 +
 8 files changed, 748 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 5b69794..cbcafed 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -19,11 +19,9 @@
 package org.apache.flink.api.common;
 
 import com.esotericsoftware.kryo.Serializer;
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.util.Preconditions;
-
 
 import java.io.Serializable;
 import java.util.Collections;
@@ -32,6 +30,8 @@ import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * A config to define the behavior of the program execution. It allows to define (among other
  * options) the following settings:
@@ -116,6 +116,12 @@ public class ExecutionConfig implements Serializable {
 	
 	private long taskCancellationIntervalMillis = -1;
 
+	/**
+	 * Timeout after which an ongoing task cancellation will lead to a fatal
+	 * TaskManager error, usually killing the JVM.
+	 */
+	private long taskCancellationTimeoutMillis = -1;
+
 	// ------------------------------- User code values --------------------------------------------
 
 	private GlobalJobParameters globalJobParameters;
@@ -219,7 +225,7 @@ public class ExecutionConfig implements Serializable {
 	 * @param parallelism The parallelism to use
 	 */
 	public ExecutionConfig setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
+		checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
 			"The parallelism of an operator must be at least 1.");
 
 		this.parallelism = parallelism;
@@ -245,6 +251,38 @@ public class ExecutionConfig implements Serializable {
 	}
 
 	/**
+	 * Returns the timeout (in milliseconds) after which an ongoing task
+	 * cancellation leads to a fatal TaskManager error.
+	 *
+	 * <p>The value <code>0</code> disables the timeout. In this case a stuck
+	 * cancellation will not lead to a fatal error.
+	 */
+	@PublicEvolving
+	public long getTaskCancellationTimeout() {
+		return this.taskCancellationTimeoutMillis;
+	}
+
+	/**
+	 * Sets the timeout (in milliseconds) after which an ongoing task cancellation
+	 * is considered failed, leading to a fatal TaskManager error.
+	 *
+	 * <p>By default, this is deactivated.
+	 *
+	 * <p>The cluster default is configured via {@link org.apache.flink.configuration.ConfigConstants#TASK_CANCELLATION_TIMEOUT_MILLIS}.
+	 *
+	 * <p>The value <code>0</code> disables the timeout. In this case a stuck
+	 * cancellation will not lead to a fatal error.
+	 *
+	 * @param timeout The task cancellation timeout (in milliseconds).
+	 */
+	@PublicEvolving
+	public ExecutionConfig setTaskCancellationTimeout(long timeout) {
+		checkArgument(timeout >= 0, "Timeout needs to be >= 0.");
+		this.taskCancellationTimeoutMillis = timeout;
+		return this;
+	}
+
+	/**
 	 * Sets the restart strategy to be used for recovery.
 	 *
 	 * <pre>{@code

http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 9902350..d1ad1c4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -270,6 +270,13 @@ public final class ConfigConstants {
 	@PublicEvolving
 	public static final String TASK_CANCELLATION_INTERVAL_MILLIS = "task.cancellation-interval";
 
+	/**
+	 * Timeout in milliseconds after which a task cancellation times out and
+	 * leads to a fatal TaskManager error.
+	 */
+	@PublicEvolving
+	public static final String TASK_CANCELLATION_TIMEOUT_MILLIS = "task.cancellation.timeout";
+
 	// --------------------------- Runtime Algorithms -------------------------------
 	
 	/**
@@ -859,6 +866,13 @@ public final class ConfigConstants {
 	 * */
 	public static final long DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS = 30000;
 
+	/**
+	 * Default timeout in milliseconds after which a task cancellation times out
+	 * and leads to a fatal TaskManager error. This has been backported from 1.2 and
+	 * deactivated by default.
+	 */
+	public static final long DEFAULT_TASK_CANCELLATION_TIMEOUT_MILLIS = 0; // deactivated
+
 	// ------------------------ Runtime Algorithms ------------------------
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 25a7e29..4b47cba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskmanager;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
@@ -25,7 +26,6 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -52,18 +52,18 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
 import org.apache.flink.runtime.messages.TaskMessages.FailTask;
 import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
 import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StateUtils;
 import org.apache.flink.util.SerializedValue;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
@@ -76,6 +76,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
@@ -229,6 +230,9 @@ public class Task implements Runnable {
 	/** Initialized from the Flink configuration. May also be set at the ExecutionConfig */
 	private long taskCancellationInterval;
 
+	/** Initialized from the Flink configuration. May also be set at the ExecutionConfig */
+	private long taskCancellationTimeout;
+
 	/**
 	 * <p><b>IMPORTANT:</b> This constructor may not start any work that would need to
 	 * be undone in the case of a failing task deployment.</p>
@@ -259,9 +263,14 @@ public class Task implements Runnable {
 		this.operatorState = tdd.getOperatorState();
 		this.serializedExecutionConfig = checkNotNull(tdd.getSerializedExecutionConfig());
 
-		this.taskCancellationInterval = jobConfiguration.getLong(
-			ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS,
-			ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS);
+		Configuration taskConfig = tdd.getTaskConfiguration();
+		this.taskCancellationInterval = taskConfig.getLong(
+				ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS,
+				ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS);
+
+		this.taskCancellationTimeout = taskConfig.getLong(
+				ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS,
+				ConfigConstants.DEFAULT_TASK_CANCELLATION_TIMEOUT_MILLIS);
 
 		this.memoryManager = checkNotNull(memManager);
 		this.ioManager = checkNotNull(ioManager);
@@ -381,6 +390,16 @@ public class Task implements Runnable {
 		return executingThread;
 	}
 
+	@VisibleForTesting
+	long getTaskCancellationInterval() {
+		return taskCancellationInterval;
+	}
+
+	@VisibleForTesting
+	long getTaskCancellationTimeout() {
+		return taskCancellationTimeout;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Task Execution
 	// ------------------------------------------------------------------------
@@ -478,6 +497,11 @@ public class Task implements Runnable {
 				taskCancellationInterval = executionConfig.getTaskCancellationInterval();
 			}
 
+			if (executionConfig.getTaskCancellationTimeout() >= 0) {
+				// override task cancellation timeout from Flink config if set in ExecutionConfig
+				taskCancellationTimeout = executionConfig.getTaskCancellationTimeout();
+			}
+
 			// now load the task's invokable code
 			invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);
 
@@ -868,12 +892,16 @@ public class Task implements Runnable {
 						// because the canceling may block on user code, we cancel from a separate thread
 						// we do not reuse the async call handler, because that one may be blocked, in which
 						// case the canceling could not continue
+
+						// The canceller calls cancel and interrupts the executing thread once
 						Runnable canceler = new TaskCanceler(
 								LOG,
 								invokable,
 								executingThread,
 								taskNameWithSubtask,
 								taskCancellationInterval,
+								taskCancellationTimeout,
+								taskManager,
 								producedPartitions,
 								inputGates);
 						Thread cancelThread = new Thread(executingThread.getThreadGroup(), canceler,
@@ -1119,16 +1147,29 @@ public class Task implements Runnable {
 		private final AbstractInvokable invokable;
 		private final Thread executer;
 		private final String taskName;
-		private final long taskCancellationIntervalMillis;
 		private final ResultPartition[] producedPartitions;
 		private final SingleInputGate[] inputGates;
 
+		/** Interrupt interval. */
+		private final long interruptInterval;
+
+		/** Timeout after which a fatal error notification happens. */
+		private final long interruptTimeout;
+
+		/** TaskManager to notify about a timeout */
+		private final ActorGateway taskManager;
+
+		/** Watch Dog thread */
+		private final Thread watchDogThread;
+
 		public TaskCanceler(
 				Logger logger,
 				AbstractInvokable invokable,
 				Thread executer,
 				String taskName,
-				long cancelationInterval,
+				long cancellationInterval,
+				long cancellationTimeout,
+				ActorGateway taskManager,
 				ResultPartition[] producedPartitions,
 				SingleInputGate[] inputGates) {
 
@@ -1136,26 +1177,46 @@ public class Task implements Runnable {
 			this.invokable = invokable;
 			this.executer = executer;
 			this.taskName = taskName;
-			this.taskCancellationIntervalMillis = cancelationInterval;
+			this.interruptInterval = cancellationInterval;
+			this.interruptTimeout = cancellationTimeout;
+			this.taskManager = taskManager;
 			this.producedPartitions = producedPartitions;
 			this.inputGates = inputGates;
+
+			if (cancellationTimeout > 0) {
+				// The watch dog repeatedly interrupts the executor until
+				// the cancellation timeout kicks in (at which point the
+				// task manager is notified about a fatal error) or the
+				// executor has terminated.
+				this.watchDogThread = new Thread(
+						executer.getThreadGroup(),
+						new TaskCancelerWatchDog(),
+						"WatchDog for " + taskName + " cancellation");
+				this.watchDogThread.setDaemon(true);
+			} else {
+				this.watchDogThread = null;
+			}
 		}
 
 		@Override
 		public void run() {
 			try {
+				if (watchDogThread != null) {
+					watchDogThread.start();
+				}
+
 				// the user-defined cancel method may throw errors.
 				// we need do continue despite that
 				try {
 					invokable.cancel();
-				}
-				catch (Throwable t) {
+				} catch (Throwable t) {
 					logger.error("Error while canceling the task", t);
 				}
 
 				// Early release of input and output buffer pools. We do this
 				// in order to unblock async Threads, which produce/consume the
-				// intermediate streams outside of the main Task Thread.
+				// intermediate streams outside of the main Task Thread (like
+				// the Kafka consumer).
 				//
 				// Don't do this before cancelling the invokable. Otherwise we
 				// will get misleading errors in the logs.
@@ -1178,16 +1239,45 @@ public class Task implements Runnable {
 				// interrupt the running thread initially
 				executer.interrupt();
 				try {
-					executer.join(taskCancellationIntervalMillis);
+					executer.join(interruptInterval);
 				}
 				catch (InterruptedException e) {
 					// we can ignore this
 				}
 
-				// it is possible that the user code does not react immediately. for that
-				// reason, we spawn a separate thread that repeatedly interrupts the user code until
-				// it exits
+				if (watchDogThread != null) {
+					watchDogThread.interrupt();
+					watchDogThread.join();
+				}
+			} catch (Throwable t) {
+				logger.error("Error in the task canceler", t);
+			}
+		}
+
+		/**
+		 * Watchdog for the cancellation. If the task is stuck in cancellation,
+		 * we notify the task manager about a fatal error.
+		 */
+		private class TaskCancelerWatchDog implements Runnable {
+
+			@Override
+			public void run() {
+				long intervalNanos = TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS);
+				long timeoutNanos = TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS);
+				long deadline = System.nanoTime() + timeoutNanos;
+
+				try {
+					// Initial wait before interrupting periodically
+					Thread.sleep(interruptInterval);
+				} catch (InterruptedException ignored) {
+				}
+
+				// It is possible that the user code does not react to the task canceller.
+				// for that reason, we spawn this separate thread that repeatedly interrupts
+				// the user code until it exits. If the suer user code does not exit within
+				// the timeout, we notify the job manager about a fatal error.
 				while (executer.isAlive()) {
+					long now = System.nanoTime();
 
 					// build the stack trace of where the thread is stuck, for the log
 					StringBuilder bld = new StringBuilder();
@@ -1196,21 +1286,34 @@ public class Task implements Runnable {
 						bld.append(e).append('\n');
 					}
 
-					logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}",
-							taskName, bld.toString());
+					if (now >= deadline) {
+						long duration = TimeUnit.SECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS);
+						String msg = String.format("Task '%s' did not react to cancelling signal in " +
+										"the last %d seconds, but is stuck in method:\n %s",
+								taskName,
+								duration,
+								bld.toString());
 
-					executer.interrupt();
-					try {
-						executer.join(taskCancellationIntervalMillis);
-					}
-					catch (InterruptedException e) {
-						// we can ignore this
+						taskManager.tell(new TaskManagerMessages.FatalError(msg, null));
+
+						return; // done, don't forget to leave the loop
+					} else {
+						logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}",
+								taskName, bld.toString());
+
+						executer.interrupt();
+						try {
+							long timeLeftNanos = Math.min(intervalNanos, deadline - now);
+							long timeLeftMillis = TimeUnit.MILLISECONDS.convert(timeLeftNanos, TimeUnit.NANOSECONDS);
+
+							if (timeLeftMillis > 0) {
+								executer.join(timeLeftMillis);
+							}
+						} catch (InterruptedException ignored) {
+						}
 					}
 				}
 			}
-			catch (Throwable t) {
-				logger.error("Error in the task canceler", t);
-			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingFatalErrorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingFatalErrorTest.java
new file mode 100644
index 0000000..1f0e84d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingFatalErrorTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.actor.ActorRef;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that the TaskManager process properly exits when the TaskManager actor dies.
+ */
+public class TaskManagerProcessReapingFatalErrorTest extends TaskManagerProcessReapingTestBase {
+
+	@Override
+	void onTaskManagerProcessRunning(ActorRef taskManager) {
+		taskManager.tell(new TaskManagerMessages.FatalError("ouch", null), ActorRef.noSender());
+	}
+
+	@Override
+	void onTaskManagerProcessTerminated(String processOutput) {
+		assertTrue("Did not log expected message", processOutput.contains("ouch"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index 85d6ede..8aed021 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -19,248 +19,17 @@
 package org.apache.flink.runtime.taskmanager;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
-import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.util.NetUtils;
-
-import org.junit.Test;
-
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.net.InetAddress;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
-import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
-import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive;
-
 /**
  * Tests that the TaskManager process properly exits when the TaskManager actor dies.
  */
-public class TaskManagerProcessReapingTest {
-
-	@Test
-	public void testReapProcessOnFailure() {
-		Process taskManagerProcess = null;
-		ActorSystem jmActorSystem = null;
-
-		final StringWriter processOutput = new StringWriter();
-
-		try {
-			String javaCommand = getJavaCommandPath();
-
-			// check that we run this test only if the java command
-			// is available on this machine
-			if (javaCommand == null) {
-				System.out.println("---- Skipping TaskManagerProcessReapingTest : Could not find java executable ----");
-				return;
-			}
-
-			// create a logging file for the process
-			File tempLogFile = File.createTempFile("testlogconfig", "properties");
-			tempLogFile.deleteOnExit();
-			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
-
-			final InetAddress localhost = InetAddress.getByName("localhost");
-			final int jobManagerPort = NetUtils.getAvailablePort();
-
-			// start a JobManager
-			Tuple2<String, Object> localAddress = new Tuple2<String, Object>(localhost.getHostAddress(), jobManagerPort);
-			jmActorSystem = AkkaUtils.createActorSystem(
-					new Configuration(), new Some<Tuple2<String, Object>>(localAddress));
-
-			ActorRef jmActor = JobManager.startJobManagerActors(
-				new Configuration(),
-				jmActorSystem,
-				JobManager.class,
-				MemoryArchivist.class)._1;
-
-			// start a ResourceManager
-			StandaloneLeaderRetrievalService standaloneLeaderRetrievalService =
-				new StandaloneLeaderRetrievalService(AkkaUtils.getAkkaURL(jmActorSystem, jmActor));
-
-			FlinkResourceManager.startResourceManagerActors(
-				new Configuration(),
-				jmActorSystem,
-				standaloneLeaderRetrievalService,
-				StandaloneResourceManager.class);
-
-			final int taskManagerPort = NetUtils.getAvailablePort();
-
-			// start the task manager process
-			String[] command = new String[] {
-					javaCommand,
-					"-Dlog.level=DEBUG",
-					"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
-					"-Xms256m", "-Xmx256m",
-					"-classpath", getCurrentClasspath(),
-					TaskManagerTestEntryPoint.class.getName(),
-					String.valueOf(jobManagerPort), String.valueOf(taskManagerPort)
-			};
-
-			ProcessBuilder bld = new ProcessBuilder(command);
-			taskManagerProcess = bld.start();
-			new PipeForwarder(taskManagerProcess.getErrorStream(), processOutput);
-
-			// grab the reference to the TaskManager. try multiple times, until the process
-			// is started and the TaskManager is up
-			String taskManagerActorName = String.format("akka.tcp://flink@%s/user/%s",
-					org.apache.flink.util.NetUtils.ipAddressAndPortToUrlString(localhost, taskManagerPort),
-					TaskManager.TASK_MANAGER_NAME());
-
-			ActorRef taskManagerRef = null;
-			Throwable lastError = null;
-			for (int i = 0; i < 40; i++) {
-				try {
-					taskManagerRef = TaskManager.getTaskManagerRemoteReference(
-							taskManagerActorName, jmActorSystem, new FiniteDuration(25, TimeUnit.SECONDS));
-					break;
-				}
-				catch (Throwable t) {
-					// TaskManager probably not ready yet
-					lastError = t;
-				}
-				Thread.sleep(500);
-			}
+public class TaskManagerProcessReapingTest extends TaskManagerProcessReapingTestBase {
 
-			assertTrue("TaskManager process died", isProcessAlive(taskManagerProcess));
-
-			if (taskManagerRef == null) {
-				if (lastError != null) {
-					lastError.printStackTrace();
-				}
-				fail("TaskManager process did not launch the TaskManager properly. Failed to look up "
-						+ taskManagerActorName);
-			}
-
-			// kill the TaskManager actor
-			taskManagerRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
-
-			// wait for max 5 seconds for the process to terminate
-			{
-				long now = System.currentTimeMillis();
-				long deadline = now + 10000;
-
-				while (now < deadline && isProcessAlive(taskManagerProcess)) {
-					Thread.sleep(100);
-					now = System.currentTimeMillis();
-				}
-			}
-
-			assertFalse("TaskManager process did not terminate upon actor death", isProcessAlive(taskManagerProcess));
-
-			int returnCode = taskManagerProcess.exitValue();
-			assertEquals("TaskManager died, but not because of the process reaper",
-					TaskManager.RUNTIME_FAILURE_RETURN_CODE(), returnCode);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			printProcessLog(processOutput.toString());
-			fail(e.getMessage());
-		}
-		catch (Error e) {
-			e.printStackTrace();
-			printProcessLog(processOutput.toString());
-			throw e;
-		}
-		finally {
-			if (taskManagerProcess != null) {
-				taskManagerProcess.destroy();
-			}
-			if (jmActorSystem != null) {
-				jmActorSystem.shutdown();
-			}
-		}
+	@Override
+	void onTaskManagerProcessRunning(ActorRef taskManager) {
+		// kill the TaskManager actor
+		taskManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
 	}
 
-	private static void printProcessLog(String log) {
-		System.out.println("-----------------------------------------");
-		System.out.println("       BEGIN SPAWNED PROCESS LOG");
-		System.out.println("-----------------------------------------");
-		System.out.println(log);
-		System.out.println("-----------------------------------------");
-		System.out.println("        END SPAWNED PROCESS LOG");
-		System.out.println("-----------------------------------------");
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	public static class TaskManagerTestEntryPoint {
-
-		public static void main(String[] args) {
-			try {
-				int jobManagerPort = Integer.parseInt(args[0]);
-				int taskManagerPort = Integer.parseInt(args[1]);
-
-				Configuration cfg = new Configuration();
-				cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-				cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
-				cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
-				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256);
-
-				TaskManager.runTaskManager("localhost", ResourceID.generate(), taskManagerPort, cfg);
-
-				// wait forever
-				Object lock = new Object();
-				synchronized (lock) {
-					lock.wait();
-				}
-			}
-			catch (Throwable t) {
-				System.exit(1);
-			}
-		}
-	}
-
-	private static class PipeForwarder extends Thread {
-
-		private final StringWriter target;
-		private final InputStream source;
-
-		public PipeForwarder(InputStream source, StringWriter target) {
-			super("Pipe Forwarder");
-			setDaemon(true);
-
-			this.source = source;
-			this.target = target;
-
-			start();
-		}
-
-		@Override
-		public void run() {
-			try {
-				int next;
-				while ((next = source.read()) != -1) {
-					target.write(next);
-				}
-			}
-			catch (IOException e) {
-				// terminate
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
new file mode 100644
index 0000000..c7913f7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.NetUtils;
+import org.junit.Test;
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests that the TaskManager process properly exits when the TaskManager actor dies.
+ */
+public abstract class TaskManagerProcessReapingTestBase {
+
+	/**
+	 * Called after the task manager has been started up. After calling this
+	 * method, the test base checks that the process exits.
+	 */
+	abstract void onTaskManagerProcessRunning(ActorRef taskManager);
+
+	/**
+	 * Called after the task manager has successfully terminated.
+	 */
+	void onTaskManagerProcessTerminated(String processOutput) {
+		// Default does nothing
+	}
+
+	@Test
+	public void testReapProcessOnFailure() {
+		Process taskManagerProcess = null;
+		ActorSystem jmActorSystem = null;
+
+		final StringWriter processOutput = new StringWriter();
+
+		try {
+			String javaCommand = getJavaCommandPath();
+
+			// check that we run this test only if the java command
+			// is available on this machine
+			if (javaCommand == null) {
+				System.out.println("---- Skipping TaskManagerProcessReapingTest : Could not find java executable ----");
+				return;
+			}
+
+			// create a logging file for the process
+			File tempLogFile = File.createTempFile("testlogconfig", "properties");
+			tempLogFile.deleteOnExit();
+			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
+
+			final InetAddress localhost = InetAddress.getByName("localhost");
+			final int jobManagerPort = NetUtils.getAvailablePort();
+
+			// start a JobManager
+			Tuple2<String, Object> localAddress = new Tuple2<String, Object>(localhost.getHostAddress(), jobManagerPort);
+			jmActorSystem = AkkaUtils.createActorSystem(
+					new Configuration(), new Some<Tuple2<String, Object>>(localAddress));
+
+			ActorRef jmActor = JobManager.startJobManagerActors(
+				new Configuration(),
+				jmActorSystem,
+				JobManager.class,
+				MemoryArchivist.class)._1;
+
+			// start a ResourceManager
+			StandaloneLeaderRetrievalService standaloneLeaderRetrievalService =
+				new StandaloneLeaderRetrievalService(AkkaUtils.getAkkaURL(jmActorSystem, jmActor));
+
+			FlinkResourceManager.startResourceManagerActors(
+				new Configuration(),
+				jmActorSystem,
+				standaloneLeaderRetrievalService,
+				StandaloneResourceManager.class);
+
+			final int taskManagerPort = NetUtils.getAvailablePort();
+
+			// start the task manager process
+			String[] command = new String[] {
+					javaCommand,
+					"-Dlog.level=DEBUG",
+					"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
+					"-Xms256m", "-Xmx256m",
+					"-classpath", getCurrentClasspath(),
+					TaskManagerTestEntryPoint.class.getName(),
+					String.valueOf(jobManagerPort), String.valueOf(taskManagerPort)
+			};
+
+			ProcessBuilder bld = new ProcessBuilder(command);
+			taskManagerProcess = bld.start();
+			new PipeForwarder(taskManagerProcess.getErrorStream(), processOutput);
+
+			// grab the reference to the TaskManager. try multiple times, until the process
+			// is started and the TaskManager is up
+			String taskManagerActorName = String.format("akka.tcp://flink@%s/user/%s",
+					org.apache.flink.util.NetUtils.ipAddressAndPortToUrlString(localhost, taskManagerPort),
+					TaskManager.TASK_MANAGER_NAME());
+
+			ActorRef taskManagerRef = null;
+			Throwable lastError = null;
+			for (int i = 0; i < 40; i++) {
+				try {
+					taskManagerRef = TaskManager.getTaskManagerRemoteReference(
+							taskManagerActorName, jmActorSystem, new FiniteDuration(25, TimeUnit.SECONDS));
+					break;
+				}
+				catch (Throwable t) {
+					// TaskManager probably not ready yet
+					lastError = t;
+				}
+				Thread.sleep(500);
+			}
+
+			assertTrue("TaskManager process died", isProcessAlive(taskManagerProcess));
+
+			if (taskManagerRef == null) {
+				if (lastError != null) {
+					lastError.printStackTrace();
+				}
+				fail("TaskManager process did not launch the TaskManager properly. Failed to look up "
+						+ taskManagerActorName);
+			}
+
+			// kill the TaskManager actor
+			onTaskManagerProcessRunning(taskManagerRef);
+
+			// wait for max 5 seconds for the process to terminate
+			{
+				long now = System.currentTimeMillis();
+				long deadline = now + 10000;
+
+				while (now < deadline && isProcessAlive(taskManagerProcess)) {
+					Thread.sleep(100);
+					now = System.currentTimeMillis();
+				}
+			}
+
+			assertFalse("TaskManager process did not terminate upon actor death", isProcessAlive(taskManagerProcess));
+
+			int returnCode = taskManagerProcess.exitValue();
+			assertEquals("TaskManager died, but not because of the process reaper",
+					TaskManager.RUNTIME_FAILURE_RETURN_CODE(), returnCode);
+
+			onTaskManagerProcessTerminated(processOutput.toString());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			printProcessLog(processOutput.toString());
+			fail(e.getMessage());
+		}
+		catch (Error e) {
+			e.printStackTrace();
+			printProcessLog(processOutput.toString());
+			throw e;
+		}
+		finally {
+			if (taskManagerProcess != null) {
+				taskManagerProcess.destroy();
+			}
+			if (jmActorSystem != null) {
+				jmActorSystem.shutdown();
+			}
+		}
+	}
+
+	private static void printProcessLog(String log) {
+		System.out.println("-----------------------------------------");
+		System.out.println("       BEGIN SPAWNED PROCESS LOG");
+		System.out.println("-----------------------------------------");
+		System.out.println(log);
+		System.out.println("-----------------------------------------");
+		System.out.println("        END SPAWNED PROCESS LOG");
+		System.out.println("-----------------------------------------");
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	public static class TaskManagerTestEntryPoint {
+
+		public static void main(String[] args) {
+			try {
+				int jobManagerPort = Integer.parseInt(args[0]);
+				int taskManagerPort = Integer.parseInt(args[1]);
+
+				Configuration cfg = new Configuration();
+				cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+				cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+				cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256);
+
+				TaskManager.runTaskManager("localhost", ResourceID.generate(), taskManagerPort, cfg);
+
+				// wait forever
+				Object lock = new Object();
+				synchronized (lock) {
+					lock.wait();
+				}
+			}
+			catch (Throwable t) {
+				System.exit(1);
+			}
+		}
+	}
+
+	private static class PipeForwarder extends Thread {
+
+		private final StringWriter target;
+		private final InputStream source;
+
+		public PipeForwarder(InputStream source, StringWriter target) {
+			super("Pipe Forwarder");
+			setDaemon(true);
+
+			this.source = source;
+			this.target = target;
+
+			start();
+		}
+
+		@Override
+		public void run() {
+			try {
+				int next;
+				while ((next = source.read()) != -1) {
+					target.write(next);
+				}
+			}
+			catch (IOException e) {
+				// terminate
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index fec9ef3..56ab9c6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -19,9 +19,10 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -34,7 +35,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -44,13 +44,14 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.messages.TaskMessages;
-
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
@@ -69,7 +70,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doThrow;
@@ -86,11 +86,12 @@ import static org.mockito.Mockito.when;
  * execution listener, which simply put the messages in a queue to be picked
  * up by the test and validated.
  */
-public class TaskTest {
+public class TaskTest extends TestLogger {
 	
 	private static OneShotLatch awaitLatch;
 	private static OneShotLatch triggerLatch;
-	
+	private static OneShotLatch cancelLatch;
+
 	private ActorGateway taskManagerGateway;
 	private ActorGateway jobManagerGateway;
 	private ActorGateway listenerGateway;
@@ -110,6 +111,7 @@ public class TaskTest {
 		
 		awaitLatch = new OneShotLatch();
 		triggerLatch = new OneShotLatch();
+		cancelLatch = new OneShotLatch();
 	}
 
 	@After
@@ -557,6 +559,123 @@ public class TaskTest {
 		verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
 	}
 
+	/**
+	 * Tests that interrupt happens via watch dog if canceller is stuck in cancel.
+	 * Task cancellation blocks the task canceller. Interrupt after cancel via
+	 * cancellation watch dog.
+	 */
+	@Test
+	public void testWatchDogInterruptsTask() throws Exception {
+		Configuration config = new Configuration();
+		config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 5);
+		config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 50);
+
+		Task task = createTask(InvokableBlockingInCancel.class, config);
+		task.startTaskThread();
+
+		awaitLatch.await();
+
+		task.cancelExecution();
+		task.getExecutingThread().join();
+
+		// No fatal error
+		for (Object msg : taskManagerMessages) {
+			assertFalse("Unexpected FatalError message", msg instanceof TaskManagerMessages.FatalError);
+		}
+	}
+
+	/**
+	 * The invoke() method holds a lock (trigger awaitLatch after acquisition)
+	 * and cancel cannot complete because it also tries to acquire the same lock.
+	 * This is resolved by the watch dog, no fatal error.
+	 */
+	@Test
+	public void testInterruptableSharedLockInInvokeAndCancel() throws Exception {
+		Configuration config = new Configuration();
+		config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 5);
+		config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 50);
+
+		Task task = createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config);
+		task.startTaskThread();
+
+		awaitLatch.await();
+
+		task.cancelExecution();
+		task.getExecutingThread().join();
+
+		// No fatal error
+		for (Object msg : taskManagerMessages) {
+			assertFalse("Unexpected FatalError message", msg instanceof TaskManagerMessages.FatalError);
+		}
+	}
+
+	/**
+	 * The invoke() method blocks infinitely, but cancel() does not block. Only
+	 * resolved by a fatal error.
+	 */
+	@Test
+	public void testFatalErrorAfterUninterruptibleInvoke() throws Exception {
+		Configuration config = new Configuration();
+		config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 5);
+		config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 50);
+
+		Task task = createTask(InvokableUninterruptibleBlockingInvoke.class, config);
+
+		try {
+			task.startTaskThread();
+
+			awaitLatch.await();
+
+			task.cancelExecution();
+
+			for (int i = 0; i < 10; i++) {
+				Object msg = taskManagerMessages.poll(1, TimeUnit.SECONDS);
+				if (msg instanceof TaskManagerMessages.FatalError) {
+					return; // success
+				}
+			}
+
+			fail("Did not receive expected task manager message");
+		} finally {
+			// Interrupt again to clean up Thread
+			cancelLatch.trigger();
+			task.getExecutingThread().interrupt();
+			task.getExecutingThread().join();
+		}
+	}
+
+	/**
+	 * Tests that the task configuration is respected and overwritten by the execution config.
+	 */
+	@Test
+	public void testTaskConfig() throws Exception {
+		long interval = 28218123;
+		long timeout = interval + 19292;
+
+		Configuration config = new Configuration();
+		config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, interval);
+		config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, timeout);
+
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.setTaskCancellationInterval(interval + 1337);
+		executionConfig.setTaskCancellationTimeout(timeout - 1337);
+
+		Task task = createTask(InvokableBlockingInInvoke.class, config, executionConfig);
+
+		assertEquals(interval, task.getTaskCancellationInterval());
+		assertEquals(timeout, task.getTaskCancellationTimeout());
+
+		task.startTaskThread();
+
+		awaitLatch.await();
+
+		assertEquals(executionConfig.getTaskCancellationInterval(), task.getTaskCancellationInterval());
+		assertEquals(executionConfig.getTaskCancellationTimeout(), task.getTaskCancellationTimeout());
+
+		task.getExecutingThread().interrupt();
+		task.getExecutingThread().join();
+	}
+
 	// ------------------------------------------------------------------------
 
 	private void setInputGate(Task task, SingleInputGate inputGate) {
@@ -589,13 +708,28 @@ public class TaskTest {
 	}
 
 	private Task createTask(Class<? extends AbstractInvokable> invokable) {
+		return createTask(invokable, new Configuration(), new ExecutionConfig());
+	}
+
+	private Task createTask(Class<? extends AbstractInvokable> invokable, Configuration config) {
+		return createTask(invokable, config, new ExecutionConfig());
+	}
+
+	private Task createTask(Class<? extends AbstractInvokable> invokable, Configuration config, ExecutionConfig execConfig) {
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-		return createTask(invokable, libCache);
+		return createTask(invokable, libCache, config, execConfig);
 	}
 
 	private Task createTask(Class<? extends AbstractInvokable> invokable,
 							LibraryCacheManager libCache) {
+		return createTask(invokable, libCache, new Configuration(), new ExecutionConfig());
+	}
+
+	private Task createTask(Class<? extends AbstractInvokable> invokable,
+							LibraryCacheManager libCache,
+							Configuration config,
+							ExecutionConfig execConfig) {
 
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
@@ -604,14 +738,23 @@ public class TaskTest {
 		when(network.getPartitionConsumableNotifier()).thenReturn(consumableNotifier);
 		when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
 		
-		return createTask(invokable, libCache, network);
+		return createTask(invokable, libCache, network, config, execConfig);
 	}
 	
 	private Task createTask(Class<? extends AbstractInvokable> invokable,
 							LibraryCacheManager libCache,
 							NetworkEnvironment networkEnvironment) {
+
+		return createTask(invokable, libCache, networkEnvironment, new Configuration(), new ExecutionConfig());
+	}
+
+	private Task createTask(Class<? extends AbstractInvokable> invokable,
+							LibraryCacheManager libCache,
+							NetworkEnvironment networkEnvironment,
+							Configuration config,
+							ExecutionConfig execConfig) {
 		
-		TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(invokable);
+		TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(invokable, config, execConfig);
 		
 		return new Task(
 				tdd,
@@ -629,18 +772,26 @@ public class TaskTest {
 	}
 
 	private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> invokable) {
-		SerializedValue<ExecutionConfig> execConfig;
+		return createTaskDeploymentDescriptor(invokable, new Configuration(), new ExecutionConfig());
+	}
+
+	private TaskDeploymentDescriptor createTaskDeploymentDescriptor(
+			Class<? extends AbstractInvokable> invokable,
+			Configuration taskConfig,
+			ExecutionConfig execConfig) {
+
+		SerializedValue<ExecutionConfig> serializedExecConfig;
 		try {
-			execConfig = new SerializedValue<>(new ExecutionConfig());
+			serializedExecConfig = new SerializedValue<>(execConfig);
 		} catch (IOException e) {
 			throw new RuntimeException(e);
 		}
 		
 		return new TaskDeploymentDescriptor(
 				new JobID(), "Test Job", new JobVertexID(), new ExecutionAttemptID(),
-				execConfig,
+				serializedExecConfig,
 				"Test Task", 0, 1, 0,
-				new Configuration(), new Configuration(),
+				new Configuration(), taskConfig,
 				invokable.getName(),
 				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 				Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -842,4 +993,72 @@ public class TaskTest {
 			throw new CancelTaskException();
 		}
 	}
+
+	public static final class InvokableInterruptableSharedLockInInvokeAndCancel extends AbstractInvokable {
+
+		private final Object lock = new Object();
+
+		@Override
+		public void invoke() throws Exception {
+			synchronized (lock) {
+				awaitLatch.trigger();
+				wait();
+			}
+		}
+
+		@Override
+		public void cancel() throws Exception {
+			synchronized (lock) {
+				cancelLatch.trigger();
+			}
+		}
+	}
+
+	public static final class InvokableBlockingInCancel extends AbstractInvokable {
+
+		@Override
+		public void invoke() throws Exception {
+			awaitLatch.trigger();
+
+			try {
+				cancelLatch.await();
+				synchronized (this) {
+					wait();
+				}
+			} catch (InterruptedException ignored) {
+				synchronized (this) {
+					notifyAll(); // notify all that are stuck in cancel
+				}
+			}
+		}
+
+		@Override
+		public void cancel() throws Exception {
+
+			synchronized (this) {
+				cancelLatch.trigger();
+				wait();
+			}
+		}
+	}
+
+	public static final class InvokableUninterruptibleBlockingInvoke extends AbstractInvokable {
+
+		@Override
+		public void invoke() throws Exception {
+			while (!cancelLatch.isTriggered()) {
+				try {
+					synchronized (this) {
+						awaitLatch.trigger();
+						wait();
+					}
+				} catch (InterruptedException ignored) {
+				}
+			}
+		}
+
+		@Override
+		public void cancel() throws Exception {
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cc6655b7/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
index d802860..9ad2c30 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
@@ -52,4 +52,13 @@ public final class OneShotLatch {
 			}
 		}
 	}
+
+	/**
+	 * Checks if the latch was triggered.
+	 *
+	 * @return True, if the latch was triggered, false if not.
+	 */
+	public boolean isTriggered() {
+		return triggered;
+	}
 }