You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/10 19:06:12 UTC

[1/2] flink git commit: [FLINK-9691] [kinesis] Modify runloop to try to track a particular getRecords() frequency.

Repository: flink
Updated Branches:
  refs/heads/master abfdc1a2d -> 53e665765


[FLINK-9691] [kinesis] Modify runloop to try to track a particular getRecords() frequency.

This closes #6290


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7953a2ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7953a2ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7953a2ef

Branch: refs/heads/master
Commit: 7953a2efea6aaa745ace7843770d12986f798b6d
Parents: abfdc1a
Author: Jamie Grier <jg...@lyft.com>
Authored: Mon Jul 9 14:20:47 2018 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 10 18:28:26 2018 +0200

----------------------------------------------------------------------
 .../connectors/kinesis/internals/ShardConsumer.java          | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7953a2ef/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 0d730af..30f0016 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -199,6 +199,7 @@ public class ShardConsumer<T> implements Runnable {
 				}
 			}
 
+			long lastTimeNanos = 0;
 			while (isRunning()) {
 				if (nextShardItr == null) {
 					fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
@@ -207,7 +208,12 @@ public class ShardConsumer<T> implements Runnable {
 					break;
 				} else {
 					if (fetchIntervalMillis != 0) {
-						Thread.sleep(fetchIntervalMillis);
+						long elapsedTimeNanos = System.nanoTime() - lastTimeNanos;
+						long sleepTimeMillis = fetchIntervalMillis - (elapsedTimeNanos / 1_000_000);
+						if (sleepTimeMillis > 0) {
+							Thread.sleep(sleepTimeMillis);
+						}
+						lastTimeNanos = System.nanoTime();
 					}
 
 					GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);


[2/2] flink git commit: [FLINK-9776] [runtime] Stop sending periodic interrupts once executing thread leaves user function / operator code.

Posted by se...@apache.org.
[FLINK-9776] [runtime] Stop sending periodic interrupts once executing thread leaves user function / operator code.

This closes #6275


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53e66576
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53e66576
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53e66576

Branch: refs/heads/master
Commit: 53e6657658bc750b78c32e91fa7e2c02e8c54e33
Parents: 7953a2e
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 6 13:34:27 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 10 18:34:50 2018 +0200

----------------------------------------------------------------------
 .../jobgraph/tasks/AbstractInvokable.java       | 43 +++++++++++++++++---
 .../apache/flink/runtime/taskmanager/Task.java  | 40 +++++++++++-------
 .../streaming/runtime/tasks/StreamTask.java     | 10 +++++
 3 files changed, 73 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/53e66576/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index 1734d68..a8d5697 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -59,6 +59,9 @@ public abstract class AbstractInvokable {
 	/** The environment assigned to this invokable. */
 	private final Environment environment;
 
+	/** Flag whether cancellation should interrupt the executing thread. */
+	private volatile boolean shouldInterruptOnCancel = true;
+
 	/**
 	 * Create an Invokable task and set its environment.
 	 *
@@ -68,6 +71,10 @@ public abstract class AbstractInvokable {
 		this.environment = checkNotNull(environment);
 	}
 
+	// ------------------------------------------------------------------------
+	//  Core methods
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Starts the execution.
 	 *
@@ -95,8 +102,34 @@ public abstract class AbstractInvokable {
 	}
 
 	/**
+	 * Sets whether the thread that executes the {@link #invoke()} method should be
+	 * interrupted during cancellation. This method sets the flag for both the initial
+	 * interrupt, as well as for the repeated interrupt. Setting the interruption to
+	 * false at some point during the cancellation procedure is a way to stop further
+	 * interrupts from happening.
+	 */
+	public void setShouldInterruptOnCancel(boolean shouldInterruptOnCancel) {
+		this.shouldInterruptOnCancel = shouldInterruptOnCancel;
+	}
+
+	/**
+	 * Checks whether the task should be interrupted during cancellation.
+	 * This method is check both for the initial interrupt, as well as for the
+	 * repeated interrupt. Setting the interruption to false via
+	 * {@link #setShouldInterruptOnCancel(boolean)} is a way to stop further interrupts
+	 * from happening.
+	 */
+	public boolean shouldInterruptOnCancel() {
+		return shouldInterruptOnCancel;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Access to Environment and Configuration
+	// ------------------------------------------------------------------------
+
+	/**
 	 * Returns the environment of this task.
-	 * 
+	 *
 	 * @return The environment of this task.
 	 */
 	public Environment getEnvironment() {
@@ -114,7 +147,7 @@ public abstract class AbstractInvokable {
 
 	/**
 	 * Returns the current number of subtasks the respective task is split into.
-	 * 
+	 *
 	 * @return the current number of subtasks the respective task is split into
 	 */
 	public int getCurrentNumberOfSubtasks() {
@@ -123,7 +156,7 @@ public abstract class AbstractInvokable {
 
 	/**
 	 * Returns the index of this subtask in the subtask group.
-	 * 
+	 *
 	 * @return the index of this subtask in the subtask group
 	 */
 	public int getIndexInSubtaskGroup() {
@@ -132,7 +165,7 @@ public abstract class AbstractInvokable {
 
 	/**
 	 * Returns the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobVertex}.
-	 * 
+	 *
 	 * @return the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobVertex}
 	 */
 	public Configuration getTaskConfiguration() {
@@ -141,7 +174,7 @@ public abstract class AbstractInvokable {
 
 	/**
 	 * Returns the job configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobGraph}.
-	 * 
+	 *
 	 * @return the job configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobGraph}
 	 */
 	public Configuration getJobConfiguration() {

http://git-wip-us.apache.org/repos/asf/flink/blob/53e66576/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f8aa0e0..60b2ed8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1046,19 +1046,22 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 
 						// the periodic interrupting thread - a different thread than the canceller, in case
 						// the application code does blocking stuff in its cancellation paths.
-						Runnable interrupter = new TaskInterrupter(
-								LOG,
-								executingThread,
-								taskNameWithSubtask,
-								taskCancellationInterval);
+						if (invokable.shouldInterruptOnCancel()) {
+							Runnable interrupter = new TaskInterrupter(
+									LOG,
+									invokable,
+									executingThread,
+									taskNameWithSubtask,
+									taskCancellationInterval);
 
-						Thread interruptingThread = new Thread(
-								executingThread.getThreadGroup(),
-								interrupter,
-								String.format("Canceler/Interrupts for %s (%s).", taskNameWithSubtask, executionId));
-						interruptingThread.setDaemon(true);
-						interruptingThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
-						interruptingThread.start();
+							Thread interruptingThread = new Thread(
+									executingThread.getThreadGroup(),
+									interrupter,
+									String.format("Canceler/Interrupts for %s (%s).", taskNameWithSubtask, executionId));
+							interruptingThread.setDaemon(true);
+							interruptingThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
+							interruptingThread.start();
+						}
 
 						// if a cancellation timeout is set, the watchdog thread kills the process
 						// if graceful cancellation does not succeed
@@ -1514,8 +1517,10 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 					}
 				}
 
-				// send the initial interruption signal
-				executer.interrupt();
+				// send the initial interruption signal, if requested
+				if (invokable.shouldInterruptOnCancel()) {
+					executer.interrupt();
+				}
 			}
 			catch (Throwable t) {
 				ExceptionUtils.rethrowIfFatalError(t);
@@ -1532,6 +1537,9 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 		/** The logger to report on the fatal condition. */
 		private final Logger log;
 
+		/** The invokable task. */
+		private final AbstractInvokable task;
+
 		/** The executing task thread that we wait for to terminate. */
 		private final Thread executerThread;
 
@@ -1543,11 +1551,13 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 
 		TaskInterrupter(
 				Logger log,
+				AbstractInvokable task,
 				Thread executerThread,
 				String taskName,
 				long interruptIntervalMillis) {
 
 			this.log = log;
+			this.task = task;
 			this.executerThread = executerThread;
 			this.taskName = taskName;
 			this.interruptIntervalMillis = interruptIntervalMillis;
@@ -1563,7 +1573,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 
 				// log stack trace where the executing thread is stuck and
 				// interrupt the running thread periodically while it is still alive
-				while (executerThread.isAlive()) {
+				while (task.shouldInterruptOnCancel() && executerThread.isAlive()) {
 					// build the stack trace of where the thread is stuck, for the log
 					StackTraceElement[] stack = executerThread.getStackTrace();
 					StringBuilder bld = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/flink/blob/53e66576/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 41257ce..db504d5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -341,6 +341,16 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			// clean up everything we initialized
 			isRunning = false;
 
+			// Now that we are outside the user code, we do not want to be interrupted further
+			// upon cancellation. The shutdown logic below needs to make sure it does not issue calls
+			// that block and stall shutdown.
+			// Additionally, the cancellation watch dog will issue a hard-cancel (kill the TaskManager
+			// process) as a backup in case some shutdown procedure blocks outside our control.
+			setShouldInterruptOnCancel(false);
+
+			// clear any previously issued interrupt for a more graceful shutdown
+			Thread.interrupted();
+
 			// stop all timers and threads
 			tryShutdownTimerService();