You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/10/27 15:41:27 UTC
[2/2] flink git commit: [FLINK-4715] Fail TaskManager with fatal
error if task cancellation is stuck
[FLINK-4715] Fail TaskManager with fatal error if task cancellation is stuck
- Splits the cancellation up into two threads:
* The `TaskCanceler` calls `cancel` on the invokable and `interrupt`
on the executing Thread. It then exists.
* The `TaskCancellationWatchDog` kicks in after the task cancellation
timeout (current default: 30 secs) and periodically calls `interrupt`
on the executing Thread. If the Thread does not terminate within
the task cancellation timeout (new config value, default 3 mins), the task
manager is notified about a fatal error, leading to termination of the JVM.
- The new configuration is exposed via `ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS`
(default: 3 mins) and the `ExecutionConfig` (similar to the cancellation interval).
This closes #2652.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/27fd2493
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/27fd2493
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/27fd2493
Branch: refs/heads/master
Commit: 27fd2493e98e90bf904f0b6b7424fd4213b04977
Parents: d2168b6
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Oct 18 09:50:36 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Oct 27 17:41:00 2016 +0200
----------------------------------------------------------------------
.../flink/api/common/ExecutionConfig.java | 45 ++-
.../flink/configuration/ConfigConstants.java | 9 +-
.../flink/configuration/TaskManagerOptions.java | 64 +++++
.../apache/flink/runtime/taskmanager/Task.java | 153 +++++++++--
...TaskManagerProcessReapingFatalErrorTest.java | 40 +++
.../TaskManagerProcessReapingTest.java | 241 +---------------
.../TaskManagerProcessReapingTestBase.java | 275 +++++++++++++++++++
.../flink/runtime/taskmanager/TaskTest.java | 253 ++++++++++++++++-
8 files changed, 796 insertions(+), 284 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/27fd2493/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 3daf9cf..3cde5e7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -19,11 +19,10 @@
package org.apache.flink.api.common;
import com.esotericsoftware.kryo.Serializer;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.util.Preconditions;
-
+import org.apache.flink.configuration.TaskManagerOptions;
import java.io.Serializable;
import java.util.Collections;
@@ -32,6 +31,8 @@ import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
* A config to define the behavior of the program execution. It allows to define (among other
* options) the following settings:
@@ -135,6 +136,12 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
private long taskCancellationIntervalMillis = -1;
+ /**
+ * Timeout after which an ongoing task cancellation will lead to a fatal
+ * TaskManager error, usually killing the JVM.
+ */
+ private long taskCancellationTimeoutMillis = -1;
+
// ------------------------------- User code values --------------------------------------------
private GlobalJobParameters globalJobParameters;
@@ -305,7 +312,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
*/
@PublicEvolving
public void setMaxParallelism(int maxParallelism) {
- Preconditions.checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0.");
+ checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0.");
this.maxParallelism = maxParallelism;
}
@@ -327,6 +334,36 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
}
/**
+ * Returns the timeout (in milliseconds) after which an ongoing task
+ * cancellation leads to a fatal TaskManager error.
+ *
+ * <p>The value <code>0</code> means that the timeout is disabled. In
+ * this case a stuck cancellation will not lead to a fatal error.
+ */
+ @PublicEvolving
+ public long getTaskCancellationTimeout() {
+ return this.taskCancellationTimeoutMillis;
+ }
+
+ /**
+ * Sets the timeout (in milliseconds) after which an ongoing task cancellation
+ * is considered failed, leading to a fatal TaskManager error.
+ *
+ * <p>The cluster default is configured via {@link TaskManagerOptions#TASK_CANCELLATION_TIMEOUT}.
+ *
+ * <p>The value <code>0</code> disables the timeout. In this case a stuck
+ * cancellation will not lead to a fatal error.
+ *
+ * @param timeout The task cancellation timeout (in milliseconds).
+ */
+ @PublicEvolving
+ public ExecutionConfig setTaskCancellationTimeout(long timeout) {
+ checkArgument(timeout >= 0, "Timeout needs to be >= 0.");
+ this.taskCancellationTimeoutMillis = timeout;
+ return this;
+ }
+
+ /**
* Sets the restart strategy to be used for recovery.
*
* <pre>{@code
http://git-wip-us.apache.org/repos/asf/flink/blob/27fd2493/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 9061e87..0561fd7 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -273,9 +273,10 @@ public final class ConfigConstants {
public static final String TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "taskmanager.refused-registration-pause";
/**
- * Time interval between two successive task cancellation attempts in milliseconds.
+ * Deprecated. Please use {@link TaskManagerOptions#TASK_CANCELLATION_INTERVAL}.
*/
@PublicEvolving
+ @Deprecated
public static final String TASK_CANCELLATION_INTERVAL_MILLIS = "task.cancellation-interval";
// --------------------------- Runtime Algorithms -------------------------------
@@ -948,7 +949,6 @@ public final class ConfigConstants {
@Deprecated
public static final String SAVEPOINT_FS_DIRECTORY_KEY = "savepoints.state.backend.fs.dir";
-
// ------------------------------------------------------------------------
// Default Values
// ------------------------------------------------------------------------
@@ -1086,8 +1086,9 @@ public final class ConfigConstants {
public static final boolean DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE = false;
/**
- * The default interval (in milliseconds) to wait between consecutive task cancellation attempts (= 30000 msec).
- * */
+ * Deprecated. Please use {@link TaskManagerOptions#TASK_CANCELLATION_INTERVAL}.
+ */
+ @Deprecated
public static final long DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS = 30000;
// ------------------------ Runtime Algorithms ------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/27fd2493/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
new file mode 100644
index 0000000..0f60a4d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to TaskManager and Task settings.
+ */
+@PublicEvolving
+public class TaskManagerOptions {
+
+ // ------------------------------------------------------------------------
+ // TaskManager Options
+ // ------------------------------------------------------------------------
+
+ // @TODO Migrate 'taskmanager.*' config options from ConfigConstants
+
+ // ------------------------------------------------------------------------
+ // Task Options
+ // ------------------------------------------------------------------------
+
+ /**
+ * Time interval in milliseconds between two successive task cancellation
+ * attempts.
+ */
+ public static final ConfigOption<Long> TASK_CANCELLATION_INTERVAL =
+ key("task.cancellation.interval")
+ .defaultValue(30000L)
+ .withDeprecatedKeys("task.cancellation-interval");
+
+ /**
+ * Timeout in milliseconds after which a task cancellation times out and
+ * leads to a fatal TaskManager error. A value of <code>0</code> deactivates
+ * the watch dog.
+ */
+ public static final ConfigOption<Long> TASK_CANCELLATION_TIMEOUT =
+ key("task.cancellation.timeout")
+ .defaultValue(180000L);
+
+ // ------------------------------------------------------------------------
+
+ /** Not intended to be instantiated */
+ private TaskManagerOptions() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/27fd2493/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index bd522bd..f09e88a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -18,12 +18,13 @@
package org.apache.flink.runtime.taskmanager;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
@@ -64,6 +65,7 @@ import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URL;
import java.util.HashMap;
@@ -75,6 +77,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -236,6 +239,9 @@ public class Task implements Runnable, TaskActions {
/** Initialized from the Flink configuration. May also be set at the ExecutionConfig */
private long taskCancellationInterval;
+ /** Initialized from the Flink configuration. May also be set at the ExecutionConfig */
+ private long taskCancellationTimeout;
+
/**
* <p><b>IMPORTANT:</b> This constructor may not start any work that would need to
* be undone in the case of a failing task deployment.</p>
@@ -270,9 +276,9 @@ public class Task implements Runnable, TaskActions {
this.serializedExecutionConfig = checkNotNull(tdd.getSerializedExecutionConfig());
this.taskStateHandles = tdd.getTaskStateHandles();
- this.taskCancellationInterval = jobConfiguration.getLong(
- ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS,
- ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS);
+ Configuration taskConfig = tdd.getTaskConfiguration();
+ this.taskCancellationInterval = taskConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL);
+ this.taskCancellationTimeout = taskConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT);
this.memoryManager = checkNotNull(memManager);
this.ioManager = checkNotNull(ioManager);
@@ -401,6 +407,16 @@ public class Task implements Runnable, TaskActions {
return executingThread;
}
+ @VisibleForTesting
+ long getTaskCancellationInterval() {
+ return taskCancellationInterval;
+ }
+
+ @VisibleForTesting
+ long getTaskCancellationTimeout() {
+ return taskCancellationTimeout;
+ }
+
// ------------------------------------------------------------------------
// Task Execution
// ------------------------------------------------------------------------
@@ -498,6 +514,11 @@ public class Task implements Runnable, TaskActions {
taskCancellationInterval = executionConfig.getTaskCancellationInterval();
}
+ if (executionConfig.getTaskCancellationTimeout() >= 0) {
+ // override task cancellation timeout from Flink config if set in ExecutionConfig
+ taskCancellationTimeout = executionConfig.getTaskCancellationTimeout();
+ }
+
// now load the task's invokable code
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);
@@ -568,7 +589,6 @@ public class Task implements Runnable, TaskActions {
taskStateHandles = null;
}
-
// ----------------------------------------------------------------
// actual task core work
// ----------------------------------------------------------------
@@ -876,12 +896,16 @@ public class Task implements Runnable, TaskActions {
// because the canceling may block on user code, we cancel from a separate thread
// we do not reuse the async call handler, because that one may be blocked, in which
// case the canceling could not continue
+
+ // The canceller calls cancel and interrupts the executing thread once
Runnable canceler = new TaskCanceler(
LOG,
invokable,
executingThread,
taskNameWithSubtask,
taskCancellationInterval,
+ taskCancellationTimeout,
+ taskManagerConnection,
producedPartitions,
inputGates);
Thread cancelThread = new Thread(executingThread.getThreadGroup(), canceler,
@@ -1171,16 +1195,30 @@ public class Task implements Runnable, TaskActions {
private final AbstractInvokable invokable;
private final Thread executer;
private final String taskName;
- private final long taskCancellationIntervalMillis;
private final ResultPartition[] producedPartitions;
private final SingleInputGate[] inputGates;
+ /** Interrupt interval. */
+ private final long interruptInterval;
+
+ /** Timeout after which a fatal error notification happens. */
+ private final long interruptTimeout;
+
+ /** TaskManager to notify about a timeout */
+ private final TaskManagerConnection taskManager;
+
+ /** Watch Dog thread */
+ @Nullable
+ private final Thread watchDogThread;
+
public TaskCanceler(
Logger logger,
AbstractInvokable invokable,
Thread executer,
String taskName,
- long cancelationInterval,
+ long cancellationInterval,
+ long cancellationTimeout,
+ TaskManagerConnection taskManager,
ResultPartition[] producedPartitions,
SingleInputGate[] inputGates) {
@@ -1188,26 +1226,46 @@ public class Task implements Runnable, TaskActions {
this.invokable = invokable;
this.executer = executer;
this.taskName = taskName;
- this.taskCancellationIntervalMillis = cancelationInterval;
+ this.interruptInterval = cancellationInterval;
+ this.interruptTimeout = cancellationTimeout;
+ this.taskManager = taskManager;
this.producedPartitions = producedPartitions;
this.inputGates = inputGates;
+
+ if (cancellationTimeout > 0) {
+ // The watch dog repeatedly interrupts the executor until
+ // the cancellation timeout kicks in (at which point the
+ // task manager is notified about a fatal error) or the
+ // executor has terminated.
+ this.watchDogThread = new Thread(
+ executer.getThreadGroup(),
+ new TaskCancelerWatchDog(),
+ "WatchDog for " + taskName + " cancellation");
+ this.watchDogThread.setDaemon(true);
+ } else {
+ this.watchDogThread = null;
+ }
}
@Override
public void run() {
try {
+ if (watchDogThread != null) {
+ watchDogThread.start();
+ }
+
// the user-defined cancel method may throw errors.
// we need do continue despite that
try {
invokable.cancel();
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
logger.error("Error while canceling the task", t);
}
// Early release of input and output buffer pools. We do this
// in order to unblock async Threads, which produce/consume the
- // intermediate streams outside of the main Task Thread.
+ // intermediate streams outside of the main Task Thread (like
+ // the Kafka consumer).
//
// Don't do this before cancelling the invokable. Otherwise we
// will get misleading errors in the logs.
@@ -1230,16 +1288,46 @@ public class Task implements Runnable, TaskActions {
// interrupt the running thread initially
executer.interrupt();
try {
- executer.join(taskCancellationIntervalMillis);
+ executer.join(interruptInterval);
}
catch (InterruptedException e) {
// we can ignore this
}
- // it is possible that the user code does not react immediately. for that
- // reason, we spawn a separate thread that repeatedly interrupts the user code until
- // it exits
+ if (watchDogThread != null) {
+ watchDogThread.interrupt();
+ watchDogThread.join();
+ }
+ } catch (Throwable t) {
+ logger.error("Error in the task canceler", t);
+ }
+ }
+
+ /**
+ * Watchdog for the cancellation. If the task is stuck in cancellation,
+ * we notify the task manager about a fatal error.
+ */
+ private class TaskCancelerWatchDog implements Runnable {
+
+ @Override
+ public void run() {
+ long intervalNanos = TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS);
+ long timeoutNanos = TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS);
+ long deadline = System.nanoTime() + timeoutNanos;
+
+ try {
+ // Initial wait before interrupting periodically
+ Thread.sleep(interruptInterval);
+ } catch (InterruptedException ignored) {
+ }
+
+ // It is possible that the user code does not react to the task canceller.
+ // for that reason, we spawn this separate thread that repeatedly interrupts
+ // the user code until it exits. If the suer user code does not exit within
+ // the timeout, we notify the job manager about a fatal error.
while (executer.isAlive()) {
+ long now = System.nanoTime();
+
// build the stack trace of where the thread is stuck, for the log
StringBuilder bld = new StringBuilder();
StackTraceElement[] stack = executer.getStackTrace();
@@ -1247,21 +1335,34 @@ public class Task implements Runnable, TaskActions {
bld.append(e).append('\n');
}
- logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}",
- taskName, bld.toString());
+ if (now >= deadline) {
+ long duration = TimeUnit.SECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS);
+ String msg = String.format("Task '%s' did not react to cancelling signal in " +
+ "the last %d seconds, but is stuck in method:\n %s",
+ taskName,
+ duration,
+ bld.toString());
- executer.interrupt();
- try {
- executer.join(taskCancellationIntervalMillis);
- }
- catch (InterruptedException e) {
- // we can ignore this
+ taskManager.notifyFatalError(msg, null);
+
+ return; // done, don't forget to leave the loop
+ } else {
+ logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}",
+ taskName, bld.toString());
+
+ executer.interrupt();
+ try {
+ long timeLeftNanos = Math.min(intervalNanos, deadline - now);
+ long timeLeftMillis = TimeUnit.MILLISECONDS.convert(timeLeftNanos, TimeUnit.NANOSECONDS);
+
+ if (timeLeftMillis > 0) {
+ executer.join(timeLeftMillis);
+ }
+ } catch (InterruptedException ignored) {
+ }
}
}
}
- catch (Throwable t) {
- logger.error("Error in the task canceler", t);
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/27fd2493/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingFatalErrorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingFatalErrorTest.java
new file mode 100644
index 0000000..1f0e84d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingFatalErrorTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.actor.ActorRef;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that the TaskManager process properly exits when the TaskManager actor dies.
+ */
+public class TaskManagerProcessReapingFatalErrorTest extends TaskManagerProcessReapingTestBase {
+
+ @Override
+ void onTaskManagerProcessRunning(ActorRef taskManager) {
+ taskManager.tell(new TaskManagerMessages.FatalError("ouch", null), ActorRef.noSender());
+ }
+
+ @Override
+ void onTaskManagerProcessTerminated(String processOutput) {
+ assertTrue("Did not log expected message", processOutput.contains("ouch"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/27fd2493/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index 85d6ede..8aed021 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -19,248 +19,17 @@
package org.apache.flink.runtime.taskmanager;
import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
-import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.util.NetUtils;
-
-import org.junit.Test;
-
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.net.InetAddress;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
-import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
-import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive;
-
/**
* Tests that the TaskManager process properly exits when the TaskManager actor dies.
*/
-public class TaskManagerProcessReapingTest {
-
- @Test
- public void testReapProcessOnFailure() {
- Process taskManagerProcess = null;
- ActorSystem jmActorSystem = null;
-
- final StringWriter processOutput = new StringWriter();
-
- try {
- String javaCommand = getJavaCommandPath();
-
- // check that we run this test only if the java command
- // is available on this machine
- if (javaCommand == null) {
- System.out.println("---- Skipping TaskManagerProcessReapingTest : Could not find java executable ----");
- return;
- }
-
- // create a logging file for the process
- File tempLogFile = File.createTempFile("testlogconfig", "properties");
- tempLogFile.deleteOnExit();
- CommonTestUtils.printLog4jDebugConfig(tempLogFile);
-
- final InetAddress localhost = InetAddress.getByName("localhost");
- final int jobManagerPort = NetUtils.getAvailablePort();
-
- // start a JobManager
- Tuple2<String, Object> localAddress = new Tuple2<String, Object>(localhost.getHostAddress(), jobManagerPort);
- jmActorSystem = AkkaUtils.createActorSystem(
- new Configuration(), new Some<Tuple2<String, Object>>(localAddress));
-
- ActorRef jmActor = JobManager.startJobManagerActors(
- new Configuration(),
- jmActorSystem,
- JobManager.class,
- MemoryArchivist.class)._1;
-
- // start a ResourceManager
- StandaloneLeaderRetrievalService standaloneLeaderRetrievalService =
- new StandaloneLeaderRetrievalService(AkkaUtils.getAkkaURL(jmActorSystem, jmActor));
-
- FlinkResourceManager.startResourceManagerActors(
- new Configuration(),
- jmActorSystem,
- standaloneLeaderRetrievalService,
- StandaloneResourceManager.class);
-
- final int taskManagerPort = NetUtils.getAvailablePort();
-
- // start the task manager process
- String[] command = new String[] {
- javaCommand,
- "-Dlog.level=DEBUG",
- "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
- "-Xms256m", "-Xmx256m",
- "-classpath", getCurrentClasspath(),
- TaskManagerTestEntryPoint.class.getName(),
- String.valueOf(jobManagerPort), String.valueOf(taskManagerPort)
- };
-
- ProcessBuilder bld = new ProcessBuilder(command);
- taskManagerProcess = bld.start();
- new PipeForwarder(taskManagerProcess.getErrorStream(), processOutput);
-
- // grab the reference to the TaskManager. try multiple times, until the process
- // is started and the TaskManager is up
- String taskManagerActorName = String.format("akka.tcp://flink@%s/user/%s",
- org.apache.flink.util.NetUtils.ipAddressAndPortToUrlString(localhost, taskManagerPort),
- TaskManager.TASK_MANAGER_NAME());
-
- ActorRef taskManagerRef = null;
- Throwable lastError = null;
- for (int i = 0; i < 40; i++) {
- try {
- taskManagerRef = TaskManager.getTaskManagerRemoteReference(
- taskManagerActorName, jmActorSystem, new FiniteDuration(25, TimeUnit.SECONDS));
- break;
- }
- catch (Throwable t) {
- // TaskManager probably not ready yet
- lastError = t;
- }
- Thread.sleep(500);
- }
+public class TaskManagerProcessReapingTest extends TaskManagerProcessReapingTestBase {
- assertTrue("TaskManager process died", isProcessAlive(taskManagerProcess));
-
- if (taskManagerRef == null) {
- if (lastError != null) {
- lastError.printStackTrace();
- }
- fail("TaskManager process did not launch the TaskManager properly. Failed to look up "
- + taskManagerActorName);
- }
-
- // kill the TaskManager actor
- taskManagerRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
-
- // wait for max 5 seconds for the process to terminate
- {
- long now = System.currentTimeMillis();
- long deadline = now + 10000;
-
- while (now < deadline && isProcessAlive(taskManagerProcess)) {
- Thread.sleep(100);
- now = System.currentTimeMillis();
- }
- }
-
- assertFalse("TaskManager process did not terminate upon actor death", isProcessAlive(taskManagerProcess));
-
- int returnCode = taskManagerProcess.exitValue();
- assertEquals("TaskManager died, but not because of the process reaper",
- TaskManager.RUNTIME_FAILURE_RETURN_CODE(), returnCode);
- }
- catch (Exception e) {
- e.printStackTrace();
- printProcessLog(processOutput.toString());
- fail(e.getMessage());
- }
- catch (Error e) {
- e.printStackTrace();
- printProcessLog(processOutput.toString());
- throw e;
- }
- finally {
- if (taskManagerProcess != null) {
- taskManagerProcess.destroy();
- }
- if (jmActorSystem != null) {
- jmActorSystem.shutdown();
- }
- }
+ @Override
+ void onTaskManagerProcessRunning(ActorRef taskManager) {
+ // kill the TaskManager actor
+ taskManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
- private static void printProcessLog(String log) {
- System.out.println("-----------------------------------------");
- System.out.println(" BEGIN SPAWNED PROCESS LOG");
- System.out.println("-----------------------------------------");
- System.out.println(log);
- System.out.println("-----------------------------------------");
- System.out.println(" END SPAWNED PROCESS LOG");
- System.out.println("-----------------------------------------");
- }
-
- // --------------------------------------------------------------------------------------------
-
- public static class TaskManagerTestEntryPoint {
-
- public static void main(String[] args) {
- try {
- int jobManagerPort = Integer.parseInt(args[0]);
- int taskManagerPort = Integer.parseInt(args[1]);
-
- Configuration cfg = new Configuration();
- cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
- cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
- cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
- cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256);
-
- TaskManager.runTaskManager("localhost", ResourceID.generate(), taskManagerPort, cfg);
-
- // wait forever
- Object lock = new Object();
- synchronized (lock) {
- lock.wait();
- }
- }
- catch (Throwable t) {
- System.exit(1);
- }
- }
- }
-
- private static class PipeForwarder extends Thread {
-
- private final StringWriter target;
- private final InputStream source;
-
- public PipeForwarder(InputStream source, StringWriter target) {
- super("Pipe Forwarder");
- setDaemon(true);
-
- this.source = source;
- this.target = target;
-
- start();
- }
-
- @Override
- public void run() {
- try {
- int next;
- while ((next = source.read()) != -1) {
- target.write(next);
- }
- }
- catch (IOException e) {
- // terminate
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/27fd2493/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
new file mode 100644
index 0000000..c7913f7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.NetUtils;
+import org.junit.Test;
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests that the TaskManager process properly exits when the TaskManager actor dies.
+ */
+public abstract class TaskManagerProcessReapingTestBase {
+
+ /**
+ * Called after the task manager has been started up. After calling this
+ * method, the test base checks that the process exits.
+ */
+ abstract void onTaskManagerProcessRunning(ActorRef taskManager);
+
+ /**
+ * Called after the task manager has successfully terminated.
+ */
+ void onTaskManagerProcessTerminated(String processOutput) {
+ // Default does nothing
+ }
+
+ @Test
+ public void testReapProcessOnFailure() {
+ Process taskManagerProcess = null;
+ ActorSystem jmActorSystem = null;
+
+ final StringWriter processOutput = new StringWriter();
+
+ try {
+ String javaCommand = getJavaCommandPath();
+
+ // check that we run this test only if the java command
+ // is available on this machine
+ if (javaCommand == null) {
+ System.out.println("---- Skipping TaskManagerProcessReapingTest : Could not find java executable ----");
+ return;
+ }
+
+ // create a logging file for the process
+ File tempLogFile = File.createTempFile("testlogconfig", "properties");
+ tempLogFile.deleteOnExit();
+ CommonTestUtils.printLog4jDebugConfig(tempLogFile);
+
+ final InetAddress localhost = InetAddress.getByName("localhost");
+ final int jobManagerPort = NetUtils.getAvailablePort();
+
+ // start a JobManager
+ Tuple2<String, Object> localAddress = new Tuple2<String, Object>(localhost.getHostAddress(), jobManagerPort);
+ jmActorSystem = AkkaUtils.createActorSystem(
+ new Configuration(), new Some<Tuple2<String, Object>>(localAddress));
+
+ ActorRef jmActor = JobManager.startJobManagerActors(
+ new Configuration(),
+ jmActorSystem,
+ JobManager.class,
+ MemoryArchivist.class)._1;
+
+ // start a ResourceManager
+ StandaloneLeaderRetrievalService standaloneLeaderRetrievalService =
+ new StandaloneLeaderRetrievalService(AkkaUtils.getAkkaURL(jmActorSystem, jmActor));
+
+ FlinkResourceManager.startResourceManagerActors(
+ new Configuration(),
+ jmActorSystem,
+ standaloneLeaderRetrievalService,
+ StandaloneResourceManager.class);
+
+ final int taskManagerPort = NetUtils.getAvailablePort();
+
+ // start the task manager process
+ String[] command = new String[] {
+ javaCommand,
+ "-Dlog.level=DEBUG",
+ "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
+ "-Xms256m", "-Xmx256m",
+ "-classpath", getCurrentClasspath(),
+ TaskManagerTestEntryPoint.class.getName(),
+ String.valueOf(jobManagerPort), String.valueOf(taskManagerPort)
+ };
+
+ ProcessBuilder bld = new ProcessBuilder(command);
+ taskManagerProcess = bld.start();
+ new PipeForwarder(taskManagerProcess.getErrorStream(), processOutput);
+
+ // grab the reference to the TaskManager. try multiple times, until the process
+ // is started and the TaskManager is up
+ String taskManagerActorName = String.format("akka.tcp://flink@%s/user/%s",
+ org.apache.flink.util.NetUtils.ipAddressAndPortToUrlString(localhost, taskManagerPort),
+ TaskManager.TASK_MANAGER_NAME());
+
+ ActorRef taskManagerRef = null;
+ Throwable lastError = null;
+ for (int i = 0; i < 40; i++) {
+ try {
+ taskManagerRef = TaskManager.getTaskManagerRemoteReference(
+ taskManagerActorName, jmActorSystem, new FiniteDuration(25, TimeUnit.SECONDS));
+ break;
+ }
+ catch (Throwable t) {
+ // TaskManager probably not ready yet
+ lastError = t;
+ }
+ Thread.sleep(500);
+ }
+
+ assertTrue("TaskManager process died", isProcessAlive(taskManagerProcess));
+
+ if (taskManagerRef == null) {
+ if (lastError != null) {
+ lastError.printStackTrace();
+ }
+ fail("TaskManager process did not launch the TaskManager properly. Failed to look up "
+ + taskManagerActorName);
+ }
+
+ // kill the TaskManager actor
+ onTaskManagerProcessRunning(taskManagerRef);
+
+ // wait for max 5 seconds for the process to terminate
+ {
+ long now = System.currentTimeMillis();
+ long deadline = now + 10000;
+
+ while (now < deadline && isProcessAlive(taskManagerProcess)) {
+ Thread.sleep(100);
+ now = System.currentTimeMillis();
+ }
+ }
+
+ assertFalse("TaskManager process did not terminate upon actor death", isProcessAlive(taskManagerProcess));
+
+ int returnCode = taskManagerProcess.exitValue();
+ assertEquals("TaskManager died, but not because of the process reaper",
+ TaskManager.RUNTIME_FAILURE_RETURN_CODE(), returnCode);
+
+ onTaskManagerProcessTerminated(processOutput.toString());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ printProcessLog(processOutput.toString());
+ fail(e.getMessage());
+ }
+ catch (Error e) {
+ e.printStackTrace();
+ printProcessLog(processOutput.toString());
+ throw e;
+ }
+ finally {
+ if (taskManagerProcess != null) {
+ taskManagerProcess.destroy();
+ }
+ if (jmActorSystem != null) {
+ jmActorSystem.shutdown();
+ }
+ }
+ }
+
+ private static void printProcessLog(String log) {
+ System.out.println("-----------------------------------------");
+ System.out.println(" BEGIN SPAWNED PROCESS LOG");
+ System.out.println("-----------------------------------------");
+ System.out.println(log);
+ System.out.println("-----------------------------------------");
+ System.out.println(" END SPAWNED PROCESS LOG");
+ System.out.println("-----------------------------------------");
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static class TaskManagerTestEntryPoint {
+
+ public static void main(String[] args) {
+ try {
+ int jobManagerPort = Integer.parseInt(args[0]);
+ int taskManagerPort = Integer.parseInt(args[1]);
+
+ Configuration cfg = new Configuration();
+ cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+ cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256);
+
+ TaskManager.runTaskManager("localhost", ResourceID.generate(), taskManagerPort, cfg);
+
+ // wait forever
+ Object lock = new Object();
+ synchronized (lock) {
+ lock.wait();
+ }
+ }
+ catch (Throwable t) {
+ System.exit(1);
+ }
+ }
+ }
+
+ private static class PipeForwarder extends Thread {
+
+ private final StringWriter target;
+ private final InputStream source;
+
+ public PipeForwarder(InputStream source, StringWriter target) {
+ super("Pipe Forwarder");
+ setDaemon(true);
+
+ this.source = source;
+ this.target = target;
+
+ start();
+ }
+
+ @Override
+ public void run() {
+ try {
+ int next;
+ while ((next = source.read()) != -1) {
+ target.write(next);
+ }
+ }
+ catch (IOException e) {
+ // terminate
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/27fd2493/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 9a13cde..c38d23a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskmanager;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -45,10 +46,12 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -87,11 +90,12 @@ import static org.mockito.Mockito.when;
* execution listener, which simply put the messages in a queue to be picked
* up by the test and validated.
*/
-public class TaskTest {
+public class TaskTest extends TestLogger {
private static OneShotLatch awaitLatch;
private static OneShotLatch triggerLatch;
-
+ private static OneShotLatch cancelLatch;
+
private ActorGateway taskManagerGateway;
private ActorGateway jobManagerGateway;
private ActorGateway listenerGateway;
@@ -117,6 +121,7 @@ public class TaskTest {
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
+ cancelLatch = new OneShotLatch();
}
@After
@@ -565,6 +570,123 @@ public class TaskTest {
verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
}
+ /**
+ * Tests that interrupt happens via watch dog if canceller is stuck in cancel.
+ * Task cancellation blocks the task canceller. Interrupt after cancel via
+ * cancellation watch dog.
+ */
+ @Test
+ public void testWatchDogInterruptsTask() throws Exception {
+ Configuration config = new Configuration();
+ config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL.key(), 5);
+ config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT.key(), 50);
+
+ Task task = createTask(InvokableBlockingInCancel.class, config);
+ task.startTaskThread();
+
+ awaitLatch.await();
+
+ task.cancelExecution();
+ task.getExecutingThread().join();
+
+ // No fatal error
+ for (Object msg : taskManagerMessages) {
+ assertFalse("Unexpected FatalError message", msg instanceof TaskManagerMessages.FatalError);
+ }
+ }
+
+ /**
+ * The invoke() method holds a lock (trigger awaitLatch after acquisition)
+ * and cancel cannot complete because it also tries to acquire the same lock.
+ * This is resolved by the watch dog, no fatal error.
+ */
+ @Test
+ public void testInterruptableSharedLockInInvokeAndCancel() throws Exception {
+ Configuration config = new Configuration();
+ config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL.key(), 5);
+ config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT.key(), 50);
+
+ Task task = createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config);
+ task.startTaskThread();
+
+ awaitLatch.await();
+
+ task.cancelExecution();
+ task.getExecutingThread().join();
+
+ // No fatal error
+ for (Object msg : taskManagerMessages) {
+ assertFalse("Unexpected FatalError message", msg instanceof TaskManagerMessages.FatalError);
+ }
+ }
+
+ /**
+ * The invoke() method blocks infinitely, but cancel() does not block. Only
+ * resolved by a fatal error.
+ */
+ @Test
+ public void testFatalErrorAfterUninterruptibleInvoke() throws Exception {
+ Configuration config = new Configuration();
+ config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL.key(), 5);
+ config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT.key(), 50);
+
+ Task task = createTask(InvokableUninterruptibleBlockingInvoke.class, config);
+
+ try {
+ task.startTaskThread();
+
+ awaitLatch.await();
+
+ task.cancelExecution();
+
+ for (int i = 0; i < 10; i++) {
+ Object msg = taskManagerMessages.poll(1, TimeUnit.SECONDS);
+ if (msg instanceof TaskManagerMessages.FatalError) {
+ return; // success
+ }
+ }
+
+ fail("Did not receive expected task manager message");
+ } finally {
+ // Interrupt again to clean up Thread
+ cancelLatch.trigger();
+ task.getExecutingThread().interrupt();
+ task.getExecutingThread().join();
+ }
+ }
+
+ /**
+ * Tests that the task configuration is respected and overwritten by the execution config.
+ */
+ @Test
+ public void testTaskConfig() throws Exception {
+ long interval = 28218123;
+ long timeout = interval + 19292;
+
+ Configuration config = new Configuration();
+ config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL.key(), interval);
+ config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT.key(), timeout);
+
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.setTaskCancellationInterval(interval + 1337);
+ executionConfig.setTaskCancellationTimeout(timeout - 1337);
+
+ Task task = createTask(InvokableBlockingInInvoke.class, config, executionConfig);
+
+ assertEquals(interval, task.getTaskCancellationInterval());
+ assertEquals(timeout, task.getTaskCancellationTimeout());
+
+ task.startTaskThread();
+
+ awaitLatch.await();
+
+ assertEquals(executionConfig.getTaskCancellationInterval(), task.getTaskCancellationInterval());
+ assertEquals(executionConfig.getTaskCancellationTimeout(), task.getTaskCancellationTimeout());
+
+ task.getExecutingThread().interrupt();
+ task.getExecutingThread().join();
+ }
+
// ------------------------------------------------------------------------
private void setInputGate(Task task, SingleInputGate inputGate) {
@@ -597,13 +719,33 @@ public class TaskTest {
}
private Task createTask(Class<? extends AbstractInvokable> invokable) {
+ return createTask(invokable, new Configuration(), new ExecutionConfig());
+ }
+
+ private Task createTask(Class<? extends AbstractInvokable> invokable, Configuration config) {
+ LibraryCacheManager libCache = mock(LibraryCacheManager.class);
+ when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
+ return createTask(invokable, libCache, config, new ExecutionConfig());
+ }
+
+ private Task createTask(Class<? extends AbstractInvokable> invokable, Configuration config, ExecutionConfig execConfig) {
LibraryCacheManager libCache = mock(LibraryCacheManager.class);
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
- return createTask(invokable, libCache);
+ return createTask(invokable, libCache, config, execConfig);
+ }
+
+ private Task createTask(
+ Class<? extends AbstractInvokable> invokable,
+ LibraryCacheManager libCache) {
+
+ return createTask(invokable, libCache, new Configuration(), new ExecutionConfig());
}
- private Task createTask(Class<? extends AbstractInvokable> invokable,
- LibraryCacheManager libCache) {
+ private Task createTask(
+ Class<? extends AbstractInvokable> invokable,
+ LibraryCacheManager libCache,
+ Configuration config,
+ ExecutionConfig execConfig) {
ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
@@ -615,7 +757,17 @@ public class TaskTest {
when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
.thenReturn(mock(TaskKvStateRegistry.class));
- return createTask(invokable, libCache, network, consumableNotifier, partitionStateChecker, executor);
+ return createTask(invokable, libCache, network, consumableNotifier, partitionStateChecker, executor, config, execConfig);
+ }
+
+ private Task createTask(
+ Class<? extends AbstractInvokable> invokable,
+ LibraryCacheManager libCache,
+ NetworkEnvironment networkEnvironment,
+ ResultPartitionConsumableNotifier consumableNotifier,
+ PartitionStateChecker partitionStateChecker,
+ Executor executor) {
+ return createTask(invokable, libCache, networkEnvironment, consumableNotifier, partitionStateChecker, executor, new Configuration(), new ExecutionConfig());
}
private Task createTask(
@@ -624,9 +776,11 @@ public class TaskTest {
NetworkEnvironment networkEnvironment,
ResultPartitionConsumableNotifier consumableNotifier,
PartitionStateChecker partitionStateChecker,
- Executor executor) {
+ Executor executor,
+ Configuration config,
+ ExecutionConfig execConfig) {
- TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(invokable);
+ TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(invokable, config, execConfig);
InputSplitProvider inputSplitProvider = new TaskInputSplitProvider(
jobManagerGateway,
@@ -655,19 +809,23 @@ public class TaskTest {
executor);
}
- private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> invokable) {
- SerializedValue<ExecutionConfig> execConfig;
+ private TaskDeploymentDescriptor createTaskDeploymentDescriptor(
+ Class<? extends AbstractInvokable> invokable,
+ Configuration taskConfig,
+ ExecutionConfig execConfig) {
+
+ SerializedValue<ExecutionConfig> serializedExecConfig;
try {
- execConfig = new SerializedValue<>(new ExecutionConfig());
+ serializedExecConfig = new SerializedValue<>(execConfig);
} catch (IOException e) {
throw new RuntimeException(e);
}
-
+
return new TaskDeploymentDescriptor(
new JobID(), "Test Job", new JobVertexID(), new ExecutionAttemptID(),
- execConfig,
+ serializedExecConfig,
"Test Task", 1, 0, 1, 0,
- new Configuration(), new Configuration(),
+ new Configuration(), taskConfig,
invokable.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -869,4 +1027,71 @@ public class TaskTest {
throw new CancelTaskException();
}
}
+
+ public static final class InvokableInterruptableSharedLockInInvokeAndCancel extends AbstractInvokable {
+
+ private final Object lock = new Object();
+
+ @Override
+ public void invoke() throws Exception {
+ synchronized (lock) {
+ awaitLatch.trigger();
+ wait();
+ }
+ }
+
+ @Override
+ public void cancel() throws Exception {
+ synchronized (lock) {
+ cancelLatch.trigger();
+ }
+ }
+ }
+
+ public static final class InvokableBlockingInCancel extends AbstractInvokable {
+
+ @Override
+ public void invoke() throws Exception {
+ awaitLatch.trigger();
+
+ try {
+ cancelLatch.await();
+ synchronized (this) {
+ wait();
+ }
+ } catch (InterruptedException ignored) {
+ synchronized (this) {
+ notifyAll(); // notify all that are stuck in cancel
+ }
+ }
+ }
+
+ @Override
+ public void cancel() throws Exception {
+ synchronized (this) {
+ cancelLatch.trigger();
+ wait();
+ }
+ }
+ }
+
+ public static final class InvokableUninterruptibleBlockingInvoke extends AbstractInvokable {
+
+ @Override
+ public void invoke() throws Exception {
+ while (!cancelLatch.isTriggered()) {
+ try {
+ synchronized (this) {
+ awaitLatch.trigger();
+ wait();
+ }
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+
+ @Override
+ public void cancel() throws Exception {
+ }
+ }
}