You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/10 19:06:12 UTC
[1/2] flink git commit: [FLINK-9691] [kinesis] Modify runloop to try
to track a particular getRecords() frequency.
Repository: flink
Updated Branches:
refs/heads/master abfdc1a2d -> 53e665765
[FLINK-9691] [kinesis] Modify runloop to try to track a particular getRecords() frequency.
This closes #6290
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7953a2ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7953a2ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7953a2ef
Branch: refs/heads/master
Commit: 7953a2efea6aaa745ace7843770d12986f798b6d
Parents: abfdc1a
Author: Jamie Grier <jg...@lyft.com>
Authored: Mon Jul 9 14:20:47 2018 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 10 18:28:26 2018 +0200
----------------------------------------------------------------------
.../connectors/kinesis/internals/ShardConsumer.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7953a2ef/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 0d730af..30f0016 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -199,6 +199,7 @@ public class ShardConsumer<T> implements Runnable {
}
}
+ long lastTimeNanos = 0;
while (isRunning()) {
if (nextShardItr == null) {
fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
@@ -207,7 +208,12 @@ public class ShardConsumer<T> implements Runnable {
break;
} else {
if (fetchIntervalMillis != 0) {
- Thread.sleep(fetchIntervalMillis);
+ long elapsedTimeNanos = System.nanoTime() - lastTimeNanos;
+ long sleepTimeMillis = fetchIntervalMillis - (elapsedTimeNanos / 1_000_000);
+ if (sleepTimeMillis > 0) {
+ Thread.sleep(sleepTimeMillis);
+ }
+ lastTimeNanos = System.nanoTime();
}
GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
[2/2] flink git commit: [FLINK-9776] [runtime] Stop sending periodic
interrupts once executing thread leaves user function / operator code.
Posted by se...@apache.org.
[FLINK-9776] [runtime] Stop sending periodic interrupts once executing thread leaves user function / operator code.
This closes #6275
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53e66576
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53e66576
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53e66576
Branch: refs/heads/master
Commit: 53e6657658bc750b78c32e91fa7e2c02e8c54e33
Parents: 7953a2e
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 6 13:34:27 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 10 18:34:50 2018 +0200
----------------------------------------------------------------------
.../jobgraph/tasks/AbstractInvokable.java | 43 +++++++++++++++++---
.../apache/flink/runtime/taskmanager/Task.java | 40 +++++++++++-------
.../streaming/runtime/tasks/StreamTask.java | 10 +++++
3 files changed, 73 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/53e66576/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index 1734d68..a8d5697 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -59,6 +59,9 @@ public abstract class AbstractInvokable {
/** The environment assigned to this invokable. */
private final Environment environment;
+ /** Flag whether cancellation should interrupt the executing thread. */
+ private volatile boolean shouldInterruptOnCancel = true;
+
/**
* Create an Invokable task and set its environment.
*
@@ -68,6 +71,10 @@ public abstract class AbstractInvokable {
this.environment = checkNotNull(environment);
}
+ // ------------------------------------------------------------------------
+ // Core methods
+ // ------------------------------------------------------------------------
+
/**
* Starts the execution.
*
@@ -95,8 +102,34 @@ public abstract class AbstractInvokable {
}
/**
+ * Sets whether the thread that executes the {@link #invoke()} method should be
+ * interrupted during cancellation. This method sets the flag for both the initial
+ * interrupt, as well as for the repeated interrupt. Setting the interruption to
+ * false at some point during the cancellation procedure is a way to stop further
+ * interrupts from happening.
+ */
+ public void setShouldInterruptOnCancel(boolean shouldInterruptOnCancel) {
+ this.shouldInterruptOnCancel = shouldInterruptOnCancel;
+ }
+
+ /**
+ * Checks whether the task should be interrupted during cancellation.
+ * This method is check both for the initial interrupt, as well as for the
+ * repeated interrupt. Setting the interruption to false via
+ * {@link #setShouldInterruptOnCancel(boolean)} is a way to stop further interrupts
+ * from happening.
+ */
+ public boolean shouldInterruptOnCancel() {
+ return shouldInterruptOnCancel;
+ }
+
+ // ------------------------------------------------------------------------
+ // Access to Environment and Configuration
+ // ------------------------------------------------------------------------
+
+ /**
* Returns the environment of this task.
- *
+ *
* @return The environment of this task.
*/
public Environment getEnvironment() {
@@ -114,7 +147,7 @@ public abstract class AbstractInvokable {
/**
* Returns the current number of subtasks the respective task is split into.
- *
+ *
* @return the current number of subtasks the respective task is split into
*/
public int getCurrentNumberOfSubtasks() {
@@ -123,7 +156,7 @@ public abstract class AbstractInvokable {
/**
* Returns the index of this subtask in the subtask group.
- *
+ *
* @return the index of this subtask in the subtask group
*/
public int getIndexInSubtaskGroup() {
@@ -132,7 +165,7 @@ public abstract class AbstractInvokable {
/**
* Returns the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobVertex}.
- *
+ *
* @return the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobVertex}
*/
public Configuration getTaskConfiguration() {
@@ -141,7 +174,7 @@ public abstract class AbstractInvokable {
/**
* Returns the job configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobGraph}.
- *
+ *
* @return the job configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobGraph}
*/
public Configuration getJobConfiguration() {
http://git-wip-us.apache.org/repos/asf/flink/blob/53e66576/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f8aa0e0..60b2ed8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1046,19 +1046,22 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
// the periodic interrupting thread - a different thread than the canceller, in case
// the application code does blocking stuff in its cancellation paths.
- Runnable interrupter = new TaskInterrupter(
- LOG,
- executingThread,
- taskNameWithSubtask,
- taskCancellationInterval);
+ if (invokable.shouldInterruptOnCancel()) {
+ Runnable interrupter = new TaskInterrupter(
+ LOG,
+ invokable,
+ executingThread,
+ taskNameWithSubtask,
+ taskCancellationInterval);
- Thread interruptingThread = new Thread(
- executingThread.getThreadGroup(),
- interrupter,
- String.format("Canceler/Interrupts for %s (%s).", taskNameWithSubtask, executionId));
- interruptingThread.setDaemon(true);
- interruptingThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
- interruptingThread.start();
+ Thread interruptingThread = new Thread(
+ executingThread.getThreadGroup(),
+ interrupter,
+ String.format("Canceler/Interrupts for %s (%s).", taskNameWithSubtask, executionId));
+ interruptingThread.setDaemon(true);
+ interruptingThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
+ interruptingThread.start();
+ }
// if a cancellation timeout is set, the watchdog thread kills the process
// if graceful cancellation does not succeed
@@ -1514,8 +1517,10 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
}
}
- // send the initial interruption signal
- executer.interrupt();
+ // send the initial interruption signal, if requested
+ if (invokable.shouldInterruptOnCancel()) {
+ executer.interrupt();
+ }
}
catch (Throwable t) {
ExceptionUtils.rethrowIfFatalError(t);
@@ -1532,6 +1537,9 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
/** The logger to report on the fatal condition. */
private final Logger log;
+ /** The invokable task. */
+ private final AbstractInvokable task;
+
/** The executing task thread that we wait for to terminate. */
private final Thread executerThread;
@@ -1543,11 +1551,13 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
TaskInterrupter(
Logger log,
+ AbstractInvokable task,
Thread executerThread,
String taskName,
long interruptIntervalMillis) {
this.log = log;
+ this.task = task;
this.executerThread = executerThread;
this.taskName = taskName;
this.interruptIntervalMillis = interruptIntervalMillis;
@@ -1563,7 +1573,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
// log stack trace where the executing thread is stuck and
// interrupt the running thread periodically while it is still alive
- while (executerThread.isAlive()) {
+ while (task.shouldInterruptOnCancel() && executerThread.isAlive()) {
// build the stack trace of where the thread is stuck, for the log
StackTraceElement[] stack = executerThread.getStackTrace();
StringBuilder bld = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/flink/blob/53e66576/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 41257ce..db504d5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -341,6 +341,16 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// clean up everything we initialized
isRunning = false;
+ // Now that we are outside the user code, we do not want to be interrupted further
+ // upon cancellation. The shutdown logic below needs to make sure it does not issue calls
+ // that block and stall shutdown.
+ // Additionally, the cancellation watch dog will issue a hard-cancel (kill the TaskManager
+ // process) as a backup in case some shutdown procedure blocks outside our control.
+ setShouldInterruptOnCancel(false);
+
+ // clear any previously issued interrupt for a more graceful shutdown
+ Thread.interrupted();
+
// stop all timers and threads
tryShutdownTimerService();