You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 10:00:45 UTC

[2/2] flink git commit: [FLINK-5718] [core] TaskManagers exit the JVM on fatal exceptions.

[FLINK-5718] [core] TaskManagers exit the JVM on fatal exceptions.

Manually applied and adapted commit dfc6fba5b9830e6a7804a6a0c9f69b36bf772730 for
the `release-1.2` branch.

This closes #3811.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/daa54691
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/daa54691
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/daa54691

Branch: refs/heads/release-1.2
Commit: daa54691255158b1fcd0a55193ae3766efd79b12
Parents: 852a710
Author: Matt Zimmer <zi...@netflix.com>
Authored: Tue May 2 16:46:13 2017 -0700
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri May 5 09:28:51 2017 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |   4 +-
 .../flink/configuration/TaskManagerOptions.java |   5 +
 .../org/apache/flink/util/ExceptionUtils.java   |  37 +++
 .../apache/flink/runtime/taskmanager/Task.java  |  14 ++
 .../taskmanager/TaskManagerRuntimeInfo.java     |  32 ++-
 .../flink/runtime/taskmanager/TaskManager.scala |   3 +-
 .../taskmanager/TaskManagerConfiguration.scala  |   8 +-
 .../flink/runtime/testutils/TestJvmProcess.java |   9 +
 .../runtime/util/JvmExitOnFatalErrorTest.java   | 244 +++++++++++++++++++
 .../flink/core/testutils/CommonTestUtils.java   |  25 ++
 10 files changed, 373 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index d5863a1..5f669e8 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -86,7 +86,7 @@ The default fraction for managed memory can be adjusted using the `taskmanager.m
 
 - `taskmanager.memory.segment-size`: The size of memory buffers used by the memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)).
 
-- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task managers should allocate all managed memory when starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to `true`, then it is advised that this configuration is also set to `true`.  If this configuration is set to `false` cleaning up of the allocated offheap memory happens only when the configured JVM parameter MaxDirectMemorySize is reached by triggering a full GC.
+- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task managers should allocate all managed memory when starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to `true`, then it is advised that this configuration is also set to `true`.  If this configuration is set to `false` cleaning up of the allocated offheap memory happens only when the configured JVM parameter MaxDirectMemorySize is reached by triggering a full GC. **Note:** For streaming setups, we highly recommend to set this value to `false` as the core state backends currently do not use the managed memory.
 
 ### Memory and Performance Debugging
 
@@ -263,6 +263,8 @@ The following parameters configure Flink's JobManager and TaskManagers.
 
 - `taskmanager.max-registration-pause`: The maximum registration pause between two consecutive registration attempts. The max registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. "5 s"). (DEFAULT: **30 s**)
 
+- `taskmanager.jvm-exit-on-oom`: Indicates that the TaskManager should immediately terminate the JVM if the task thread throws an `OutOfMemoryError` (DEFAULT: **false**).
+
 - `taskmanager.refused-registration-pause`: The pause after a registration has been refused by the job manager before retrying to connect. The refused registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. "5 s"). (DEFAULT: **10 s**)
 
 - `blob.fetch.retries`: The number of retries for the TaskManager to download BLOBs (such as JAR files) from the JobManager (DEFAULT: **50**).

http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 3bd15fe..05f670c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -34,6 +34,11 @@ public class TaskManagerOptions {
 
 	// @TODO Migrate 'taskmanager.*' config options from ConfigConstants
 
+	/** Whether to kill the TaskManager when the task thread throws an OutOfMemoryError */
+	public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
+		key("taskmanager.jvm-exit-on-oom")
+			.defaultValue(false);
+
 	// ------------------------------------------------------------------------
 	//  Network Options
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index d1357a8..8ec3d59 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -33,8 +33,13 @@ import java.io.StringWriter;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/**
+ * A collection of utility functions for dealing with exceptions and exception workflows.
+ */
 @Internal
 public final class ExceptionUtils {
+
+	/** The stringified representation of a null exception reference */
 	public static final String STRINGIFIED_NULL_EXCEPTION = "(null)";
 
 	/**
@@ -64,6 +69,38 @@ public final class ExceptionUtils {
 	}
 
 	/**
+	 * Checks whether the given exception indicates a situation that may leave the
+	 * JVM in a corrupted state, meaning a state where continued normal operation can only be
+	 * guaranteed via clean process restart.
+	 *
+	 * <p>Currently considered fatal exceptions are Virtual Machine errors indicating
+	 * that the JVM is corrupted, like {@link InternalError}, {@link UnknownError},
+	 * and {@link java.util.zip.ZipError} (a special case of InternalError).
+	 *
+	 * @param t The exception to check.
+	 * @return True, if the exception is considered fatal to the JVM, false otherwise.
+	 */
+	public static boolean isJvmFatalError(Throwable t) {
+		return (t instanceof InternalError) || (t instanceof UnknownError);
+	}
+
+	/**
+	 * Checks whether the given exception indicates a situation that may leave the
+	 * JVM in a corrupted state, or an out-of-memory error.
+	 *
+	 * <p>See {@link ExceptionUtils#isJvmFatalError(Throwable)} for a list of fatal JVM errors.
+	 * This method additionally classifies the {@link OutOfMemoryError} as fatal, because it
+	 * may occur in any thread (not the one that allocated the majority of the memory) and thus
+	 * is often not recoverable by destroying the particular thread that threw the exception.
+	 *
+	 * @param t The exception to check.
+	 * @return True, if the exception is fatal to the JVM or and OutOfMemoryError, false otherwise.
+	 */
+	public static boolean isJvmFatalOrOutOfMemoryError(Throwable t) {
+		return isJvmFatalError(t) || t instanceof OutOfMemoryError;
+	}
+
+	/**
 	 * Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception}
 	 * to a prior exception, or returns the new exception, if no prior exception exists.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 65b3053..8b51088 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -63,6 +63,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
@@ -698,6 +699,19 @@ public class Task implements Runnable, TaskActions {
 			// ----------------------------------------------------------------
 
 			try {
+				// check if the exception is unrecoverable
+				if (ExceptionUtils.isJvmFatalError(t) ||
+					(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError()))
+				{
+					// terminate the JVM immediately
+					// don't attempt a clean shutdown, because we cannot expect the clean shutdown to complete
+					try {
+						LOG.error("Encountered fatal error {} - terminating the JVM", t.getClass().getName(), t);
+					} finally {
+						Runtime.getRuntime().halt(-1);
+					}
+				}
+
 				// transition into our final state. we should be either in DEPLOYING, RUNNING, CANCELING, or FAILED
 				// loop for multiple retries during concurrent state changes via calls to cancel() or
 				// to failExternally()

http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
index 9ac982e..041392b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -38,7 +39,10 @@ public class TaskManagerRuntimeInfo implements java.io.Serializable {
 
 	/** list of temporary file directories */
 	private final String[] tmpDirectories;
-	
+
+	/** Flag that signals whether to halt the JVM if an OutOfMemoryError is thrown */
+	private final boolean exitJvmOnOutOfMemory;
+
 	/**
 	 * Creates a runtime info.
 	 * 
@@ -49,18 +53,30 @@ public class TaskManagerRuntimeInfo implements java.io.Serializable {
 	public TaskManagerRuntimeInfo(String hostname, Configuration configuration, String tmpDirectory) {
 		this(hostname, configuration, new String[] { tmpDirectory });
 	}
-	
+
 	/**
 	 * Creates a runtime info.
 	 * @param hostname The host name of the interface that the TaskManager uses to communicate.
 	 * @param configuration The configuration that the TaskManager was started with.
-	 * @param tmpDirectories The list of temporary file directories.   
+	 * @param tmpDirectories The list of temporary file directories.
 	 */
 	public TaskManagerRuntimeInfo(String hostname, Configuration configuration, String[] tmpDirectories) {
+		this(hostname, configuration, tmpDirectories, configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY));
+	}
+
+	/**
+	 * Creates a runtime info.
+	 * @param hostname The host name of the interface that the TaskManager uses to communicate.
+	 * @param configuration The configuration that the TaskManager was started with.
+	 * @param tmpDirectories The list of temporary file directories.
+	 * @param exitJvmOnOutOfMemory True to terminate the JVM on an OutOfMemoryError, false otherwise.
+	 */
+	public TaskManagerRuntimeInfo(String hostname, Configuration configuration, String[] tmpDirectories, boolean exitJvmOnOutOfMemory) {
 		checkArgument(tmpDirectories.length > 0);
 		this.hostname = checkNotNull(hostname);
 		this.configuration = checkNotNull(configuration);
 		this.tmpDirectories = tmpDirectories;
+		this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory;
 		
 	}
 
@@ -87,4 +103,14 @@ public class TaskManagerRuntimeInfo implements java.io.Serializable {
 	public String[] getTmpDirectories() {
 		return tmpDirectories;
 	}
+
+	/**
+	 * Checks whether the TaskManager should exit the JVM when the task thread throws
+	 * an OutOfMemoryError.
+	 *
+	 * @return True to terminate the JVM on an OutOfMemoryError, false otherwise.
+	 */
+	public boolean shouldExitJvmOnOutOfMemoryError() {
+		return exitJvmOnOutOfMemory;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index bc63655..bb93fa1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -2382,7 +2382,8 @@ object TaskManager {
       configuration,
       initialRegistrationPause,
       maxRegistrationPause,
-      refusedRegistrationPause)
+      refusedRegistrationPause,
+      configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY))
 
     (taskManagerConfig, networkConfig, taskManagerInetSocketAddress, memType)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
index aab3c5f..929ff55 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.taskmanager
 
 import java.util.concurrent.TimeUnit
 
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{Configuration, TaskManagerOptions}
 
 import scala.concurrent.duration.FiniteDuration
 
@@ -33,7 +33,8 @@ case class TaskManagerConfiguration(
     configuration: Configuration,
     initialRegistrationPause: FiniteDuration,
     maxRegistrationPause: FiniteDuration,
-    refusedRegistrationPause: FiniteDuration) {
+    refusedRegistrationPause: FiniteDuration,
+    exitJvmOnOutOfMemory: Boolean) {
 
   def this(
       tmpDirPaths: Array[String],
@@ -51,6 +52,7 @@ case class TaskManagerConfiguration(
       configuration,
       FiniteDuration(500, TimeUnit.MILLISECONDS),
       FiniteDuration(30, TimeUnit.SECONDS),
-      FiniteDuration(10, TimeUnit.SECONDS))
+      FiniteDuration(10, TimeUnit.SECONDS),
+      configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY))
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
index 5954ee5..4578edf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
@@ -295,6 +295,15 @@ public abstract class TestJvmProcess {
 		}
 	}
 
+	public void waitFor() throws InterruptedException {
+		Process process = this.process;
+		if (process != null) {
+			process.waitFor();
+		} else {
+			throw new IllegalStateException("process not started");
+		}
+	}
+
 	// ---------------------------------------------------------------------------------------------
 	// File based synchronization utilities
 	// ---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
new file mode 100644
index 0000000..bf75549
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.DummyActorGateway;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.taskmanager.*;
+import org.apache.flink.runtime.testutils.TestJvmProcess;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.*;
+
+/**
+ * Test that verifies the behavior of blocking shutdown hooks and of the
+ * {@link JvmShutdownSafeguard} that guards against it.
+ */
+public class JvmExitOnFatalErrorTest {
+
+	@Test
+	public void testExitJvmOnOutOfMemory() throws Exception {
+		// this test works only on linux
+		assumeTrue(OperatingSystem.isLinux());
+
+		// this test leaves remaining processes if not executed with Java 8
+		CommonTestUtils.assumeJava8();
+
+		// to check what went wrong (when the test hangs) uncomment this line
+//		ProcessEntryPoint.main(new String[0]);
+
+		final KillOnFatalErrorProcess testProcess = new KillOnFatalErrorProcess();
+
+		try {
+			testProcess.startProcess();
+			testProcess.waitFor();
+		}
+		finally {
+			testProcess.destroy();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Blocking Process Implementation
+	// ------------------------------------------------------------------------
+
+	private static final class KillOnFatalErrorProcess extends TestJvmProcess {
+
+		public KillOnFatalErrorProcess() throws Exception {}
+
+		@Override
+		public String getName() {
+			return "KillOnFatalErrorProcess";
+		}
+
+		@Override
+		public String[] getJvmArgs() {
+			return new String[0];
+		}
+
+		@Override
+		public String getEntryPointClassName() {
+			return ProcessEntryPoint.class.getName();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	public static final class ProcessEntryPoint {
+
+		public static void main(String[] args) throws Exception {
+
+			System.err.println("creating task");
+
+			// we suppress process exits via errors here to not
+			// have a test that exits accidentally due to a programming error
+			try {
+				final Configuration taskManagerConfig = new Configuration();
+				taskManagerConfig.setBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY, true);
+
+				final JobID jid = new JobID();
+				final JobVertexID jobVertexId = new JobVertexID();
+				final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
+				final AllocationID slotAllocationId = new AllocationID();
+
+				final SerializedValue<ExecutionConfig> execConfig = new SerializedValue<>(new ExecutionConfig());
+
+				final JobInformation jobInformation = new JobInformation(
+					jid, "Test Job", execConfig, new Configuration(),
+					Collections.<BlobKey>emptyList(), Collections.<URL>emptyList());
+
+				final TaskInformation taskInformation = new TaskInformation(
+					jobVertexId, "Test Task", 1, 1, OomInvokable.class.getName(), new Configuration());
+
+				final MemoryManager memoryManager = new MemoryManager(1024 * 1024, 1);
+				final IOManager ioManager = new IOManagerAsync();
+
+				final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
+				when(networkEnvironment.createKvStateTaskRegistry(jid, jobVertexId)).thenReturn(mock(TaskKvStateRegistry.class));
+
+				final String[] tmpDirPaths = taskManagerConfig.getString(
+					ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+					ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
+
+				final TaskManagerRuntimeInfo tmInfo = new TaskManagerRuntimeInfo("test", taskManagerConfig, tmpDirPaths);
+
+				final Executor executor = Executors.newCachedThreadPool();
+
+				Task task = new Task(
+					jobInformation,
+					taskInformation,
+					executionAttemptID,
+					0,       // subtaskIndex
+					0,       // attemptNumber
+					Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+					Collections.<InputGateDeploymentDescriptor>emptyList(),
+					0,       // targetSlotNumber
+					null,    // taskStateHandles,
+					memoryManager,
+					ioManager,
+					networkEnvironment,
+					new BroadcastVariableManager(),
+					new ActorGatewayTaskManagerConnection(new DummyActorGateway()),
+					new NoOpInputSplitProvider(),
+					new NoOpCheckpointResponder(),
+					new FallbackLibraryCacheManager(),
+					new FileCache(taskManagerConfig),
+					tmInfo,
+					new UnregisteredTaskMetricsGroup(),
+					new NoOpResultPartitionConsumableNotifier(),
+					new NoOpPartitionProducerStateChecker(),
+					executor);
+
+				System.err.println("starting task thread");
+
+				task.startTaskThread();
+			}
+			catch (Throwable t) {
+				System.err.println("ERROR STARTING TASK");
+				t.printStackTrace();
+			}
+
+			System.err.println("parking the main thread");
+			CommonTestUtils.blockForeverNonInterruptibly();
+		}
+
+		public static final class OomInvokable extends AbstractInvokable {
+
+			@Override
+			public void invoke() throws Exception {
+				throw new OutOfMemoryError();
+			}
+		}
+
+		private static final class NoOpInputSplitProvider implements InputSplitProvider {
+
+			@Override
+			public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) {
+				return null;
+			}
+		}
+
+		private static final class NoOpCheckpointResponder implements CheckpointResponder {
+
+			@Override
+			public void acknowledgeCheckpoint(JobID j, ExecutionAttemptID e, CheckpointMetaData c, SubtaskState s) {}
+
+			@Override
+			public void declineCheckpoint(JobID j, ExecutionAttemptID e, long l, Throwable t) {}
+		}
+
+		private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
+
+			@Override
+			public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {}
+		}
+
+		private static final class NoOpPartitionProducerStateChecker implements PartitionProducerStateChecker {
+
+			@Override
+			public Future<ExecutionState> requestPartitionProducerState(
+				JobID jobId, IntermediateDataSetID intermediateDataSetId, ResultPartitionID r) {
+				return null;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
index 2eb18c1..639b065 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -97,6 +97,27 @@ public class CommonTestUtils {
 	}
 
 	/**
+	 * Permanently blocks the current thread. The thread cannot be woken
+	 * up via {@link Thread#interrupt()}.
+	 */
+	public static void blockForeverNonInterruptibly() {
+		final Object lock = new Object();
+		//noinspection InfiniteLoopStatement
+		while (true) {
+			try {
+				//noinspection SynchronizationOnLocalVariableOrMethodParameter
+				synchronized (lock) {
+					lock.wait();
+				}
+			} catch (InterruptedException ignored) {}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Preconditions on the test environment
+	// ------------------------------------------------------------------------
+
+	/**
 	 * Checks whether this code runs in a Java 8 (Java 1.8) JVM. If not, this throws a
 	 * {@link AssumptionViolatedException}, which causes JUnit to skip the test that
 	 * called this method.
@@ -117,6 +138,10 @@ public class CommonTestUtils {
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  Manipulation of environment
+	// ------------------------------------------------------------------------
+
 	public static void setEnv(Map<String, String> newenv) {
 		setEnv(newenv, true);
 	}