You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/10 19:06:13 UTC
[2/2] flink git commit: [FLINK-9776] [runtime] Stop sending periodic
interrupts once executing thread leaves user function / operator code.
[FLINK-9776] [runtime] Stop sending periodic interrupts once executing thread leaves user function / operator code.
This closes #6275
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53e66576
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53e66576
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53e66576
Branch: refs/heads/master
Commit: 53e6657658bc750b78c32e91fa7e2c02e8c54e33
Parents: 7953a2e
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 6 13:34:27 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 10 18:34:50 2018 +0200
----------------------------------------------------------------------
.../jobgraph/tasks/AbstractInvokable.java | 43 +++++++++++++++++---
.../apache/flink/runtime/taskmanager/Task.java | 40 +++++++++++-------
.../streaming/runtime/tasks/StreamTask.java | 10 +++++
3 files changed, 73 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/53e66576/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index 1734d68..a8d5697 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -59,6 +59,9 @@ public abstract class AbstractInvokable {
/** The environment assigned to this invokable. */
private final Environment environment;
+ /** Flag whether cancellation should interrupt the executing thread. */
+ private volatile boolean shouldInterruptOnCancel = true;
+
/**
* Create an Invokable task and set its environment.
*
@@ -68,6 +71,10 @@ public abstract class AbstractInvokable {
this.environment = checkNotNull(environment);
}
+ // ------------------------------------------------------------------------
+ // Core methods
+ // ------------------------------------------------------------------------
+
/**
* Starts the execution.
*
@@ -95,8 +102,34 @@ public abstract class AbstractInvokable {
}
/**
+ * Sets whether the thread that executes the {@link #invoke()} method should be
+ * interrupted during cancellation. This method sets the flag for both the initial
+ * interrupt, as well as for the repeated interrupt. Setting the interruption to
+ * false at some point during the cancellation procedure is a way to stop further
+ * interrupts from happening.
+ */
+ public void setShouldInterruptOnCancel(boolean shouldInterruptOnCancel) {
+ this.shouldInterruptOnCancel = shouldInterruptOnCancel;
+ }
+
+ /**
+ * Checks whether the task should be interrupted during cancellation.
+ * This method is check both for the initial interrupt, as well as for the
+ * repeated interrupt. Setting the interruption to false via
+ * {@link #setShouldInterruptOnCancel(boolean)} is a way to stop further interrupts
+ * from happening.
+ */
+ public boolean shouldInterruptOnCancel() {
+ return shouldInterruptOnCancel;
+ }
+
+ // ------------------------------------------------------------------------
+ // Access to Environment and Configuration
+ // ------------------------------------------------------------------------
+
+ /**
* Returns the environment of this task.
- *
+ *
* @return The environment of this task.
*/
public Environment getEnvironment() {
@@ -114,7 +147,7 @@ public abstract class AbstractInvokable {
/**
* Returns the current number of subtasks the respective task is split into.
- *
+ *
* @return the current number of subtasks the respective task is split into
*/
public int getCurrentNumberOfSubtasks() {
@@ -123,7 +156,7 @@ public abstract class AbstractInvokable {
/**
* Returns the index of this subtask in the subtask group.
- *
+ *
* @return the index of this subtask in the subtask group
*/
public int getIndexInSubtaskGroup() {
@@ -132,7 +165,7 @@ public abstract class AbstractInvokable {
/**
* Returns the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobVertex}.
- *
+ *
* @return the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobVertex}
*/
public Configuration getTaskConfiguration() {
@@ -141,7 +174,7 @@ public abstract class AbstractInvokable {
/**
* Returns the job configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobGraph}.
- *
+ *
* @return the job configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobGraph}
*/
public Configuration getJobConfiguration() {
http://git-wip-us.apache.org/repos/asf/flink/blob/53e66576/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f8aa0e0..60b2ed8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1046,19 +1046,22 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
// the periodic interrupting thread - a different thread than the canceller, in case
// the application code does blocking stuff in its cancellation paths.
- Runnable interrupter = new TaskInterrupter(
- LOG,
- executingThread,
- taskNameWithSubtask,
- taskCancellationInterval);
+ if (invokable.shouldInterruptOnCancel()) {
+ Runnable interrupter = new TaskInterrupter(
+ LOG,
+ invokable,
+ executingThread,
+ taskNameWithSubtask,
+ taskCancellationInterval);
- Thread interruptingThread = new Thread(
- executingThread.getThreadGroup(),
- interrupter,
- String.format("Canceler/Interrupts for %s (%s).", taskNameWithSubtask, executionId));
- interruptingThread.setDaemon(true);
- interruptingThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
- interruptingThread.start();
+ Thread interruptingThread = new Thread(
+ executingThread.getThreadGroup(),
+ interrupter,
+ String.format("Canceler/Interrupts for %s (%s).", taskNameWithSubtask, executionId));
+ interruptingThread.setDaemon(true);
+ interruptingThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
+ interruptingThread.start();
+ }
// if a cancellation timeout is set, the watchdog thread kills the process
// if graceful cancellation does not succeed
@@ -1514,8 +1517,10 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
}
}
- // send the initial interruption signal
- executer.interrupt();
+ // send the initial interruption signal, if requested
+ if (invokable.shouldInterruptOnCancel()) {
+ executer.interrupt();
+ }
}
catch (Throwable t) {
ExceptionUtils.rethrowIfFatalError(t);
@@ -1532,6 +1537,9 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
/** The logger to report on the fatal condition. */
private final Logger log;
+ /** The invokable task. */
+ private final AbstractInvokable task;
+
/** The executing task thread that we wait for to terminate. */
private final Thread executerThread;
@@ -1543,11 +1551,13 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
TaskInterrupter(
Logger log,
+ AbstractInvokable task,
Thread executerThread,
String taskName,
long interruptIntervalMillis) {
this.log = log;
+ this.task = task;
this.executerThread = executerThread;
this.taskName = taskName;
this.interruptIntervalMillis = interruptIntervalMillis;
@@ -1563,7 +1573,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
// log stack trace where the executing thread is stuck and
// interrupt the running thread periodically while it is still alive
- while (executerThread.isAlive()) {
+ while (task.shouldInterruptOnCancel() && executerThread.isAlive()) {
// build the stack trace of where the thread is stuck, for the log
StackTraceElement[] stack = executerThread.getStackTrace();
StringBuilder bld = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/flink/blob/53e66576/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 41257ce..db504d5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -341,6 +341,16 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// clean up everything we initialized
isRunning = false;
+ // Now that we are outside the user code, we do not want to be interrupted further
+ // upon cancellation. The shutdown logic below needs to make sure it does not issue calls
+ // that block and stall shutdown.
+ // Additionally, the cancellation watch dog will issue a hard-cancel (kill the TaskManager
+ // process) as a backup in case some shutdown procedure blocks outside our control.
+ setShouldInterruptOnCancel(false);
+
+ // clear any previously issued interrupt for a more graceful shutdown
+ Thread.interrupted();
+
// stop all timers and threads
tryShutdownTimerService();