You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/10 15:33:42 UTC

[3/4] flink git commit: [FLINK-5718] [core] TaskManagers exit the JVM on fatal exceptions.

[FLINK-5718] [core] TaskManagers exit the JVM on fatal exceptions.

This closes #3276


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dfc6fba5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dfc6fba5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dfc6fba5

Branch: refs/heads/master
Commit: dfc6fba5b9830e6a7804a6a0c9f69b36bf772730
Parents: 3bde6ff
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 6 15:52:39 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 10 16:28:31 2017 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |   4 +-
 .../flink/configuration/TaskManagerOptions.java |   5 +
 .../org/apache/flink/util/ExceptionUtils.java   |  37 +++
 .../taskexecutor/TaskManagerConfiguration.java  |  19 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  14 +
 .../taskmanager/TaskManagerRuntimeInfo.java     |   8 +
 ...askManagerComponentsStartupShutdownTest.java |   3 +-
 .../flink/runtime/testutils/TestJvmProcess.java |   9 +
 .../runtime/util/JvmExitOnFatalErrorTest.java   | 259 +++++++++++++++++++
 .../util/TestingTaskManagerRuntimeInfo.java     |   6 +
 .../flink/core/testutils/CommonTestUtils.java   |  25 ++
 11 files changed, 385 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 2accdc2..b86c534 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -86,7 +86,7 @@ The default fraction for managed memory can be adjusted using the `taskmanager.m
 
 - `taskmanager.memory.segment-size`: The size of memory buffers used by the memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)).
 
-- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task managers should allocate all managed memory when starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to `true`, then it is advised that this configuration is also set to `true`.  If this configuration is set to `false` cleaning up of the allocated offheap memory happens only when the configured JVM parameter MaxDirectMemorySize is reached by triggering a full GC.
+- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task managers should allocate all managed memory when starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to `true`, then it is advised that this configuration is also set to `true`.  If this configuration is set to `false` cleaning up of the allocated offheap memory happens only when the configured JVM parameter MaxDirectMemorySize is reached by triggering a full GC. **Note:** For streaming setups, we highly recommend to set this value to `false` as the core state backends currently do not use the managed memory.
 
 ### Memory and Performance Debugging
 
@@ -265,6 +265,8 @@ The following parameters configure Flink's JobManager and TaskManagers.
 
 - `taskmanager.refused-registration-pause`: The pause after a registration has been refused by the job manager before retrying to connect. The refused registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. "5 s"). (DEFAULT: **10 s**)
 
+- `taskmanager.jvm-exit-on-oom`: Indicates that the TaskManager should immediately terminate the JVM if the task thread throws an `OutOfMemoryError` (DEFAULT: **false**).
+
 - `blob.fetch.retries`: The number of retries for the TaskManager to download BLOBs (such as JAR files) from the JobManager (DEFAULT: **50**).
 
 - `blob.fetch.num-concurrent`: The number concurrent BLOB fetches (such as JAR file downloads) that the JobManager serves (DEFAULT: **50**).

http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 6f6238b..b7ee20a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -33,6 +33,11 @@ public class TaskManagerOptions {
 	// ------------------------------------------------------------------------
 
 	// @TODO Migrate 'taskmanager.*' config options from ConfigConstants
+	
+	/** Whether to kill the TaskManager when the task thread throws an OutOfMemoryError */
+	public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
+			key("taskmanager.jvm-exit-on-oom")
+			.defaultValue(false);
 
 	// ------------------------------------------------------------------------
 	//  Network Options

http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 1069f2d..6ba9ef6 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -33,8 +33,13 @@ import java.io.StringWriter;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/**
+ * A collection of utility functions for dealing with exceptions and exception workflows.
+ */
 @Internal
 public final class ExceptionUtils {
+
+	/** The stringified representation of a null exception reference */ 
 	public static final String STRINGIFIED_NULL_EXCEPTION = "(null)";
 
 	/**
@@ -64,6 +69,38 @@ public final class ExceptionUtils {
 	}
 
 	/**
+	 * Checks whether the given exception indicates a situation that may leave the
+	 * JVM in a corrupted state, meaning a state where continued normal operation can only be
+	 * guaranteed via clean process restart.
+	 *
+	 * <p>Currently considered fatal exceptions are Virtual Machine errors indicating
+	 * that the JVM is corrupted, like {@link InternalError}, {@link UnknownError},
+	 * and {@link java.util.zip.ZipError} (a special case of InternalError).
+	 *
+	 * @param t The exception to check.
+	 * @return True, if the exception is considered fatal to the JVM, false otherwise.
+	 */
+	public static boolean isJvmFatalError(Throwable t) {
+		return (t instanceof InternalError) || (t instanceof UnknownError);
+	}
+
+	/**
+	 * Checks whether the given exception indicates a situation that may leave the
+	 * JVM in a corrupted state, or an out-of-memory error.
+	 * 
+	 * <p>See {@link ExceptionUtils#isJvmFatalError(Throwable)} for a list of fatal JVM errors.
+	 * This method additionally classifies the {@link OutOfMemoryError} as fatal, because it
+	 * may occur in any thread (not the one that allocated the majority of the memory) and thus
+	 * is often not recoverable by destroying the particular thread that threw the exception.
+	 * 
+	 * @param t The exception to check.
+	 * @return True, if the exception is fatal to the JVM or and OutOfMemoryError, false otherwise.
+	 */
+	public static boolean isJvmFatalOrOutOfMemoryError(Throwable t) {
+		return isJvmFatalError(t) || t instanceof OutOfMemoryError;
+	}
+
+	/**
 	 * Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception}
 	 * to a prior exception, or returns the new exception, if no prior exception exists.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index 1d1e732..a6e4748 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -21,12 +21,15 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.duration.Duration;
 
 import java.io.File;
@@ -53,6 +56,8 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 
 	private final UnmodifiableConfiguration configuration;
 
+	private final boolean exitJvmOnOutOfMemory;
+
 	public TaskManagerConfiguration(
 		int numberSlots,
 		String[] tmpDirectories,
@@ -62,7 +67,8 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 		Time maxRegistrationPause,
 		Time refusedRegistrationPause,
 		long cleanupInterval,
-		Configuration configuration) {
+		Configuration configuration,
+		boolean exitJvmOnOutOfMemory) {
 
 		this.numberSlots = numberSlots;
 		this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories);
@@ -73,6 +79,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 		this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause);
 		this.cleanupInterval = Preconditions.checkNotNull(cleanupInterval);
 		this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
+		this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory;
 	}
 
 	public int getNumberSlots() {
@@ -113,6 +120,11 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 		return tmpDirectories;
 	}
 
+	@Override
+	public boolean shouldExitJvmOnOutOfMemoryError() {
+		return exitJvmOnOutOfMemory;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Static factory methods
 	// --------------------------------------------------------------------------------------------
@@ -205,6 +217,8 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 				ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
 		}
 
+		final boolean exitOnOom = configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY);
+
 		return new TaskManagerConfiguration(
 			numberSlots,
 			tmpDirPaths,
@@ -214,6 +228,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 			maxRegistrationPause,
 			refusedRegistrationPause,
 			cleanupInterval,
-			configuration);
+			configuration,
+			exitOnOom);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index c2e6d09..64a83c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
@@ -700,6 +701,19 @@ public class Task implements Runnable, TaskActions {
 			// ----------------------------------------------------------------
 
 			try {
+				// check if the exception is unrecoverable
+				if (ExceptionUtils.isJvmFatalError(t) || 
+					(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError()))
+				{
+					// terminate the JVM immediately
+					// don't attempt a clean shutdown, because we cannot expect the clean shutdown to complete
+					try {
+						LOG.error("Encountered fatal error {} - terminating the JVM", t.getClass().getName(), t);
+					} finally {
+						Runtime.getRuntime().halt(-1);
+					}
+				}
+
 				// transition into our final state. we should be either in DEPLOYING, RUNNING, CANCELING, or FAILED
 				// loop for multiple retries during concurrent state changes via calls to cancel() or
 				// to failExternally()

http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
index d1efe34..dbf8c73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
@@ -38,4 +38,12 @@ public interface TaskManagerRuntimeInfo {
 	 * @return The list of temporary file directories.
 	 */
 	String[] getTmpDirectories();
+
+	/**
+	 * Checks whether the TaskManager should exit the JVM when the task thread throws
+	 * an OutOfMemoryError.
+	 * 
+	 * @return True to terminate the JVM on an OutOfMemoryError, false otherwise.
+	 */
+	boolean shouldExitJvmOnOutOfMemoryError();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index a6e1a2b..e26e176 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -109,7 +109,8 @@ public class TaskManagerComponentsStartupShutdownTest {
 				Time.seconds(30),
 				Time.seconds(10),
 				1000000, // cleanup interval
-				config);
+				config,
+				false); // exit-jvm-on-fatal-error
 
 			final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
 					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, null);

http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
index 5954ee5..4578edf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
@@ -295,6 +295,15 @@ public abstract class TestJvmProcess {
 		}
 	}
 
+	public void waitFor() throws InterruptedException {
+		Process process = this.process;
+		if (process != null) {
+			process.waitFor();
+		} else {
+			throw new IllegalStateException("process not started");
+		}
+	}
+
 	// ---------------------------------------------------------------------------------------------
 	// File based synchronization utilities
 	// ---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
new file mode 100644
index 0000000..10f4303
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.testutils.TestJvmProcess;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.*;
+
+/**
+ * Test that verifies the behavior of blocking shutdown hooks and of the
+ * {@link JvmShutdownSafeguard} that guards against it.
+ */
+public class JvmExitOnFatalErrorTest {
+
+	@Test
+	public void testExitJvmOnOutOfMemory() throws Exception {
+		// this test works only on linux
+		assumeTrue(OperatingSystem.isLinux());
+
+		// this test leaves remaining processes if not executed with Java 8
+		CommonTestUtils.assumeJava8();
+
+		// to check what went wrong (when the test hangs) uncomment this line 
+//		ProcessEntryPoint.main(new String[0]);
+
+		final KillOnFatalErrorProcess testProcess = new KillOnFatalErrorProcess();
+
+		try {
+			testProcess.startProcess();
+			testProcess.waitFor();
+		}
+		finally {
+			testProcess.destroy();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Blocking Process Implementation
+	// ------------------------------------------------------------------------
+
+	private static final class KillOnFatalErrorProcess extends TestJvmProcess {
+
+		public KillOnFatalErrorProcess() throws Exception {}
+
+		@Override
+		public String getName() {
+			return "KillOnFatalErrorProcess";
+		}
+
+		@Override
+		public String[] getJvmArgs() {
+			return new String[0];
+		}
+
+		@Override
+		public String getEntryPointClassName() {
+			return ProcessEntryPoint.class.getName();
+		}
+	} 
+
+	// ------------------------------------------------------------------------
+
+	public static final class ProcessEntryPoint {
+
+		public static void main(String[] args) throws Exception {
+
+			System.err.println("creating task");
+
+			// we suppress process exits via errors here to not
+			// have a test that exits accidentally due to a programming error 
+			try {
+				final Configuration taskManagerConfig = new Configuration();
+				taskManagerConfig.setBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY, true);
+
+				final JobID jid = new JobID();
+				final JobVertexID jobVertexId = new JobVertexID();
+				final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
+				final AllocationID slotAllocationId = new AllocationID();
+
+				final SerializedValue<ExecutionConfig> execConfig = new SerializedValue<>(new ExecutionConfig());
+
+				final JobInformation jobInformation = new JobInformation(
+						jid, "Test Job", execConfig, new Configuration(),
+						Collections.<BlobKey>emptyList(), Collections.<URL>emptyList());
+
+				final TaskInformation taskInformation = new TaskInformation(
+						jobVertexId, "Test Task", 1, 1, OomInvokable.class.getName(), new Configuration());
+
+				final MemoryManager memoryManager = new MemoryManager(1024 * 1024, 1);
+				final IOManager ioManager = new IOManagerAsync();
+
+				final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
+				when(networkEnvironment.createKvStateTaskRegistry(jid, jobVertexId)).thenReturn(mock(TaskKvStateRegistry.class));
+
+				final TaskManagerRuntimeInfo tmInfo = TaskManagerConfiguration.fromConfiguration(taskManagerConfig);
+
+				final Executor executor = Executors.newCachedThreadPool();
+
+				Task task = new Task(
+						jobInformation,
+						taskInformation,
+						executionAttemptID,
+						slotAllocationId,
+						0,       // subtaskIndex
+						0,       // attemptNumber
+						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+						Collections.<InputGateDeploymentDescriptor>emptyList(),
+						0,       // targetSlotNumber
+						null,    // taskStateHandles,
+						memoryManager,
+						ioManager,
+						networkEnvironment,
+						new BroadcastVariableManager(),
+						new NoOpTaskManagerActions(),
+						new NoOpInputSplitProvider(),
+						new NoOpCheckpointResponder(),
+						new FallbackLibraryCacheManager(),
+						new FileCache(tmInfo.getTmpDirectories()),
+						tmInfo,
+						new UnregisteredTaskMetricsGroup(),
+						new NoOpResultPartitionConsumableNotifier(),
+						new NoOpPartitionProducerStateChecker(),
+						executor);
+
+				System.err.println("starting task thread");
+
+				task.startTaskThread();
+			}
+			catch (Throwable t) {
+				System.err.println("ERROR STARTING TASK");
+				t.printStackTrace();
+			}
+
+			System.err.println("parking the main thread");
+			CommonTestUtils.blockForeverNonInterruptibly();
+		}
+
+		public static final class OomInvokable extends AbstractInvokable {
+
+			@Override
+			public void invoke() throws Exception {
+				throw new OutOfMemoryError();
+			}
+		}
+
+		private static final class NoOpTaskManagerActions implements TaskManagerActions {
+
+			@Override
+			public void notifyFinalState(ExecutionAttemptID executionAttemptID) {}
+
+			@Override
+			public void notifyFatalError(String message, Throwable cause) {}
+
+			@Override
+			public void failTask(ExecutionAttemptID executionAttemptID, Throwable cause) {}
+
+			@Override
+			public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {}
+		}
+
+		private static final class NoOpInputSplitProvider implements InputSplitProvider {
+
+			@Override
+			public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) {
+				return null;
+			}
+		}
+
+		private static final class NoOpCheckpointResponder implements CheckpointResponder {
+
+			@Override
+			public void acknowledgeCheckpoint(JobID j, ExecutionAttemptID e, CheckpointMetaData c, SubtaskState s) {}
+
+			@Override
+			public void declineCheckpoint(JobID j, ExecutionAttemptID e, long l, Throwable t) {}
+		}
+
+		private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
+
+			@Override
+			public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {}
+		}
+
+		private static final class NoOpPartitionProducerStateChecker implements PartitionProducerStateChecker {
+
+			@Override
+			public Future<ExecutionState> requestPartitionProducerState(
+					JobID jobId, IntermediateDataSetID intermediateDataSetId, ResultPartitionID r) {
+				return null;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
index 0d0363f..4183152 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
@@ -59,4 +59,10 @@ public class TestingTaskManagerRuntimeInfo implements TaskManagerRuntimeInfo {
 	public String[] getTmpDirectories() {
 		return tmpDirectories;
 	}
+
+	@Override
+	public boolean shouldExitJvmOnOutOfMemoryError() {
+		// never kill the JVM in tests
+		return false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
index 2eb18c1..639b065 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -97,6 +97,27 @@ public class CommonTestUtils {
 	}
 
 	/**
+	 * Permanently blocks the current thread. The thread cannot be woken
+	 * up via {@link Thread#interrupt()}.
+	 */
+	public static void blockForeverNonInterruptibly() {
+		final Object lock = new Object();
+		//noinspection InfiniteLoopStatement
+		while (true) {
+			try {
+				//noinspection SynchronizationOnLocalVariableOrMethodParameter
+				synchronized (lock) {
+					lock.wait();
+				}
+			} catch (InterruptedException ignored) {}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Preconditions on the test environment
+	// ------------------------------------------------------------------------
+
+	/**
 	 * Checks whether this code runs in a Java 8 (Java 1.8) JVM. If not, this throws a
 	 * {@link AssumptionViolatedException}, which causes JUnit to skip the test that
 	 * called this method.
@@ -117,6 +138,10 @@ public class CommonTestUtils {
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  Manipulation of environment
+	// ------------------------------------------------------------------------
+
 	public static void setEnv(Map<String, String> newenv) {
 		setEnv(newenv, true);
 	}