You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 10:00:45 UTC
[2/2] flink git commit: [FLINK-5718] [core] TaskManagers exit the JVM
on fatal exceptions.
[FLINK-5718] [core] TaskManagers exit the JVM on fatal exceptions.
Manually applied and adapted commit dfc6fba5b9830e6a7804a6a0c9f69b36bf772730 for
the `release-1.2` branch.
This closes #3811.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/daa54691
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/daa54691
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/daa54691
Branch: refs/heads/release-1.2
Commit: daa54691255158b1fcd0a55193ae3766efd79b12
Parents: 852a710
Author: Matt Zimmer <zi...@netflix.com>
Authored: Tue May 2 16:46:13 2017 -0700
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri May 5 09:28:51 2017 +0200
----------------------------------------------------------------------
docs/setup/config.md | 4 +-
.../flink/configuration/TaskManagerOptions.java | 5 +
.../org/apache/flink/util/ExceptionUtils.java | 37 +++
.../apache/flink/runtime/taskmanager/Task.java | 14 ++
.../taskmanager/TaskManagerRuntimeInfo.java | 32 ++-
.../flink/runtime/taskmanager/TaskManager.scala | 3 +-
.../taskmanager/TaskManagerConfiguration.scala | 8 +-
.../flink/runtime/testutils/TestJvmProcess.java | 9 +
.../runtime/util/JvmExitOnFatalErrorTest.java | 244 +++++++++++++++++++
.../flink/core/testutils/CommonTestUtils.java | 25 ++
10 files changed, 373 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index d5863a1..5f669e8 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -86,7 +86,7 @@ The default fraction for managed memory can be adjusted using the `taskmanager.m
- `taskmanager.memory.segment-size`: The size of memory buffers used by the memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)).
-- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task managers should allocate all managed memory when starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to `true`, then it is advised that this configuration is also set to `true`. If this configuration is set to `false` cleaning up of the allocated offheap memory happens only when the configured JVM parameter MaxDirectMemorySize is reached by triggering a full GC.
+- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task managers should allocate all managed memory when starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to `true`, then it is advised that this configuration is also set to `true`. If this configuration is set to `false` cleaning up of the allocated offheap memory happens only when the configured JVM parameter MaxDirectMemorySize is reached by triggering a full GC. **Note:** For streaming setups, we highly recommend to set this value to `false` as the core state backends currently do not use the managed memory.
### Memory and Performance Debugging
@@ -263,6 +263,8 @@ The following parameters configure Flink's JobManager and TaskManagers.
- `taskmanager.max-registration-pause`: The maximum registration pause between two consecutive registration attempts. The max registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. "5 s"). (DEFAULT: **30 s**)
+- `taskmanager.jvm-exit-on-oom`: Indicates that the TaskManager should immediately terminate the JVM if the task thread throws an `OutOfMemoryError` (DEFAULT: **false**).
+
- `taskmanager.refused-registration-pause`: The pause after a registration has been refused by the job manager before retrying to connect. The refused registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. "5 s"). (DEFAULT: **10 s**)
- `blob.fetch.retries`: The number of retries for the TaskManager to download BLOBs (such as JAR files) from the JobManager (DEFAULT: **50**).
http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 3bd15fe..05f670c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -34,6 +34,11 @@ public class TaskManagerOptions {
// @TODO Migrate 'taskmanager.*' config options from ConfigConstants
+ /** Whether to kill the TaskManager when the task thread throws an OutOfMemoryError */
+ public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
+ key("taskmanager.jvm-exit-on-oom")
+ .defaultValue(false);
+
// ------------------------------------------------------------------------
// Network Options
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index d1357a8..8ec3d59 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -33,8 +33,13 @@ import java.io.StringWriter;
import static org.apache.flink.util.Preconditions.checkNotNull;
+/**
+ * A collection of utility functions for dealing with exceptions and exception workflows.
+ */
@Internal
public final class ExceptionUtils {
+
+ /** The stringified representation of a null exception reference */
public static final String STRINGIFIED_NULL_EXCEPTION = "(null)";
/**
@@ -64,6 +69,38 @@ public final class ExceptionUtils {
}
/**
+ * Checks whether the given exception indicates a situation that may leave the
+ * JVM in a corrupted state, meaning a state where continued normal operation can only be
+ * guaranteed via clean process restart.
+ *
+ * <p>Currently considered fatal exceptions are Virtual Machine errors indicating
+ * that the JVM is corrupted, like {@link InternalError}, {@link UnknownError},
+ * and {@link java.util.zip.ZipError} (a special case of InternalError).
+ *
+ * @param t The exception to check.
+ * @return True, if the exception is considered fatal to the JVM, false otherwise.
+ */
+ public static boolean isJvmFatalError(Throwable t) {
+ return (t instanceof InternalError) || (t instanceof UnknownError);
+ }
+
+ /**
+ * Checks whether the given exception indicates a situation that may leave the
+ * JVM in a corrupted state, or an out-of-memory error.
+ *
+ * <p>See {@link ExceptionUtils#isJvmFatalError(Throwable)} for a list of fatal JVM errors.
+ * This method additionally classifies the {@link OutOfMemoryError} as fatal, because it
+ * may occur in any thread (not the one that allocated the majority of the memory) and thus
+ * is often not recoverable by destroying the particular thread that threw the exception.
+ *
+ * @param t The exception to check.
+ * @return True, if the exception is fatal to the JVM or and OutOfMemoryError, false otherwise.
+ */
+ public static boolean isJvmFatalOrOutOfMemoryError(Throwable t) {
+ return isJvmFatalError(t) || t instanceof OutOfMemoryError;
+ }
+
+ /**
* Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception}
* to a prior exception, or returns the new exception, if no prior exception exists.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 65b3053..8b51088 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -63,6 +63,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
@@ -698,6 +699,19 @@ public class Task implements Runnable, TaskActions {
// ----------------------------------------------------------------
try {
+ // check if the exception is unrecoverable
+ if (ExceptionUtils.isJvmFatalError(t) ||
+ (t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError()))
+ {
+ // terminate the JVM immediately
+ // don't attempt a clean shutdown, because we cannot expect the clean shutdown to complete
+ try {
+ LOG.error("Encountered fatal error {} - terminating the JVM", t.getClass().getName(), t);
+ } finally {
+ Runtime.getRuntime().halt(-1);
+ }
+ }
+
// transition into our final state. we should be either in DEPLOYING, RUNNING, CANCELING, or FAILED
// loop for multiple retries during concurrent state changes via calls to cancel() or
// to failExternally()
http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
index 9ac982e..041392b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.taskmanager;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -38,7 +39,10 @@ public class TaskManagerRuntimeInfo implements java.io.Serializable {
/** list of temporary file directories */
private final String[] tmpDirectories;
-
+
+ /** Flag that signals whether to halt the JVM if an OutOfMemoryError is thrown */
+ private final boolean exitJvmOnOutOfMemory;
+
/**
* Creates a runtime info.
*
@@ -49,18 +53,30 @@ public class TaskManagerRuntimeInfo implements java.io.Serializable {
public TaskManagerRuntimeInfo(String hostname, Configuration configuration, String tmpDirectory) {
this(hostname, configuration, new String[] { tmpDirectory });
}
-
+
/**
* Creates a runtime info.
* @param hostname The host name of the interface that the TaskManager uses to communicate.
* @param configuration The configuration that the TaskManager was started with.
- * @param tmpDirectories The list of temporary file directories.
+ * @param tmpDirectories The list of temporary file directories.
*/
public TaskManagerRuntimeInfo(String hostname, Configuration configuration, String[] tmpDirectories) {
+ this(hostname, configuration, tmpDirectories, configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY));
+ }
+
+ /**
+ * Creates a runtime info.
+ * @param hostname The host name of the interface that the TaskManager uses to communicate.
+ * @param configuration The configuration that the TaskManager was started with.
+ * @param tmpDirectories The list of temporary file directories.
+ * @param exitJvmOnOutOfMemory True to terminate the JVM on an OutOfMemoryError, false otherwise.
+ */
+ public TaskManagerRuntimeInfo(String hostname, Configuration configuration, String[] tmpDirectories, boolean exitJvmOnOutOfMemory) {
checkArgument(tmpDirectories.length > 0);
this.hostname = checkNotNull(hostname);
this.configuration = checkNotNull(configuration);
this.tmpDirectories = tmpDirectories;
+ this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory;
}
@@ -87,4 +103,14 @@ public class TaskManagerRuntimeInfo implements java.io.Serializable {
public String[] getTmpDirectories() {
return tmpDirectories;
}
+
+ /**
+ * Checks whether the TaskManager should exit the JVM when the task thread throws
+ * an OutOfMemoryError.
+ *
+ * @return True to terminate the JVM on an OutOfMemoryError, false otherwise.
+ */
+ public boolean shouldExitJvmOnOutOfMemoryError() {
+ return exitJvmOnOutOfMemory;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index bc63655..bb93fa1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -2382,7 +2382,8 @@ object TaskManager {
configuration,
initialRegistrationPause,
maxRegistrationPause,
- refusedRegistrationPause)
+ refusedRegistrationPause,
+ configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY))
(taskManagerConfig, networkConfig, taskManagerInetSocketAddress, memType)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
index aab3c5f..929ff55 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.taskmanager
import java.util.concurrent.TimeUnit
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{Configuration, TaskManagerOptions}
import scala.concurrent.duration.FiniteDuration
@@ -33,7 +33,8 @@ case class TaskManagerConfiguration(
configuration: Configuration,
initialRegistrationPause: FiniteDuration,
maxRegistrationPause: FiniteDuration,
- refusedRegistrationPause: FiniteDuration) {
+ refusedRegistrationPause: FiniteDuration,
+ exitJvmOnOutOfMemory: Boolean) {
def this(
tmpDirPaths: Array[String],
@@ -51,6 +52,7 @@ case class TaskManagerConfiguration(
configuration,
FiniteDuration(500, TimeUnit.MILLISECONDS),
FiniteDuration(30, TimeUnit.SECONDS),
- FiniteDuration(10, TimeUnit.SECONDS))
+ FiniteDuration(10, TimeUnit.SECONDS),
+ configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY))
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
index 5954ee5..4578edf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
@@ -295,6 +295,15 @@ public abstract class TestJvmProcess {
}
}
+ public void waitFor() throws InterruptedException {
+ Process process = this.process;
+ if (process != null) {
+ process.waitFor();
+ } else {
+ throw new IllegalStateException("process not started");
+ }
+ }
+
// ---------------------------------------------------------------------------------------------
// File based synchronization utilities
// ---------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
new file mode 100644
index 0000000..bf75549
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.DummyActorGateway;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.taskmanager.*;
+import org.apache.flink.runtime.testutils.TestJvmProcess;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.*;
+
+/**
+ * Test that verifies the behavior of blocking shutdown hooks and of the
+ * {@link JvmShutdownSafeguard} that guards against it.
+ */
+public class JvmExitOnFatalErrorTest {
+
+ @Test
+ public void testExitJvmOnOutOfMemory() throws Exception {
+ // this test works only on linux
+ assumeTrue(OperatingSystem.isLinux());
+
+ // this test leaves remaining processes if not executed with Java 8
+ CommonTestUtils.assumeJava8();
+
+ // to check what went wrong (when the test hangs) uncomment this line
+// ProcessEntryPoint.main(new String[0]);
+
+ final KillOnFatalErrorProcess testProcess = new KillOnFatalErrorProcess();
+
+ try {
+ testProcess.startProcess();
+ testProcess.waitFor();
+ }
+ finally {
+ testProcess.destroy();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Blocking Process Implementation
+ // ------------------------------------------------------------------------
+
+ private static final class KillOnFatalErrorProcess extends TestJvmProcess {
+
+ public KillOnFatalErrorProcess() throws Exception {}
+
+ @Override
+ public String getName() {
+ return "KillOnFatalErrorProcess";
+ }
+
+ @Override
+ public String[] getJvmArgs() {
+ return new String[0];
+ }
+
+ @Override
+ public String getEntryPointClassName() {
+ return ProcessEntryPoint.class.getName();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static final class ProcessEntryPoint {
+
+ public static void main(String[] args) throws Exception {
+
+ System.err.println("creating task");
+
+ // we suppress process exits via errors here to not
+ // have a test that exits accidentally due to a programming error
+ try {
+ final Configuration taskManagerConfig = new Configuration();
+ taskManagerConfig.setBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY, true);
+
+ final JobID jid = new JobID();
+ final JobVertexID jobVertexId = new JobVertexID();
+ final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
+ final AllocationID slotAllocationId = new AllocationID();
+
+ final SerializedValue<ExecutionConfig> execConfig = new SerializedValue<>(new ExecutionConfig());
+
+ final JobInformation jobInformation = new JobInformation(
+ jid, "Test Job", execConfig, new Configuration(),
+ Collections.<BlobKey>emptyList(), Collections.<URL>emptyList());
+
+ final TaskInformation taskInformation = new TaskInformation(
+ jobVertexId, "Test Task", 1, 1, OomInvokable.class.getName(), new Configuration());
+
+ final MemoryManager memoryManager = new MemoryManager(1024 * 1024, 1);
+ final IOManager ioManager = new IOManagerAsync();
+
+ final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
+ when(networkEnvironment.createKvStateTaskRegistry(jid, jobVertexId)).thenReturn(mock(TaskKvStateRegistry.class));
+
+ final String[] tmpDirPaths = taskManagerConfig.getString(
+ ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
+
+ final TaskManagerRuntimeInfo tmInfo = new TaskManagerRuntimeInfo("test", taskManagerConfig, tmpDirPaths);
+
+ final Executor executor = Executors.newCachedThreadPool();
+
+ Task task = new Task(
+ jobInformation,
+ taskInformation,
+ executionAttemptID,
+ 0, // subtaskIndex
+ 0, // attemptNumber
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
+ 0, // targetSlotNumber
+ null, // taskStateHandles,
+ memoryManager,
+ ioManager,
+ networkEnvironment,
+ new BroadcastVariableManager(),
+ new ActorGatewayTaskManagerConnection(new DummyActorGateway()),
+ new NoOpInputSplitProvider(),
+ new NoOpCheckpointResponder(),
+ new FallbackLibraryCacheManager(),
+ new FileCache(taskManagerConfig),
+ tmInfo,
+ new UnregisteredTaskMetricsGroup(),
+ new NoOpResultPartitionConsumableNotifier(),
+ new NoOpPartitionProducerStateChecker(),
+ executor);
+
+ System.err.println("starting task thread");
+
+ task.startTaskThread();
+ }
+ catch (Throwable t) {
+ System.err.println("ERROR STARTING TASK");
+ t.printStackTrace();
+ }
+
+ System.err.println("parking the main thread");
+ CommonTestUtils.blockForeverNonInterruptibly();
+ }
+
+ public static final class OomInvokable extends AbstractInvokable {
+
+ @Override
+ public void invoke() throws Exception {
+ throw new OutOfMemoryError();
+ }
+ }
+
+ private static final class NoOpInputSplitProvider implements InputSplitProvider {
+
+ @Override
+ public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) {
+ return null;
+ }
+ }
+
+ private static final class NoOpCheckpointResponder implements CheckpointResponder {
+
+ @Override
+ public void acknowledgeCheckpoint(JobID j, ExecutionAttemptID e, CheckpointMetaData c, SubtaskState s) {}
+
+ @Override
+ public void declineCheckpoint(JobID j, ExecutionAttemptID e, long l, Throwable t) {}
+ }
+
+ private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
+
+ @Override
+ public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {}
+ }
+
+ private static final class NoOpPartitionProducerStateChecker implements PartitionProducerStateChecker {
+
+ @Override
+ public Future<ExecutionState> requestPartitionProducerState(
+ JobID jobId, IntermediateDataSetID intermediateDataSetId, ResultPartitionID r) {
+ return null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
index 2eb18c1..639b065 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -97,6 +97,27 @@ public class CommonTestUtils {
}
/**
+ * Permanently blocks the current thread. The thread cannot be woken
+ * up via {@link Thread#interrupt()}.
+ */
+ public static void blockForeverNonInterruptibly() {
+ final Object lock = new Object();
+ //noinspection InfiniteLoopStatement
+ while (true) {
+ try {
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (lock) {
+ lock.wait();
+ }
+ } catch (InterruptedException ignored) {}
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Preconditions on the test environment
+ // ------------------------------------------------------------------------
+
+ /**
* Checks whether this code runs in a Java 8 (Java 1.8) JVM. If not, this throws a
* {@link AssumptionViolatedException}, which causes JUnit to skip the test that
* called this method.
@@ -117,6 +138,10 @@ public class CommonTestUtils {
}
}
+ // ------------------------------------------------------------------------
+ // Manipulation of environment
+ // ------------------------------------------------------------------------
+
public static void setEnv(Map<String, String> newenv) {
setEnv(newenv, true);
}