You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/23 17:47:29 UTC
[flink] branch release-1.11 updated: [FLINK-21028][task] Do not
interrupt the source thread on stop with savepoint
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 463d9e3 [FLINK-21028][task] Do not interrupt the source thread on stop with savepoint
463d9e3 is described below
commit 463d9e3d32af81c6cd210847b7c0068774221224
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Feb 19 17:47:40 2021 +0100
[FLINK-21028][task] Do not interrupt the source thread on stop with savepoint
Currently stop with savepoint relies on the EndOfPartitionEvents propagation and performs
clean shutdown after the stop with savepoint (which can produce some records to process after
the savepoint while stopping). If we interrupt source thread, we might leave the newtork stack
in an inconsitent state. So, if we want to relay on the clean shutdown, we can not interrupt
the source thread.
---
.../streaming/runtime/tasks/SourceStreamTask.java | 27 ++++++--
.../runtime/tasks/SourceStreamTaskTest.java | 81 ++++++++++++++++++++++
2 files changed, 101 insertions(+), 7 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index f407e08..e749c52 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -173,13 +173,32 @@ public class SourceStreamTask<
@Override
protected void cancelTask() {
+ cancelTask(true);
+ }
+
+ @Override
+ protected void finishTask() {
+ wasStoppedExternally = true;
+ /**
+ * Currently stop with savepoint relies on the EndOfPartitionEvents propagation and performs
+ * clean shutdown after the stop with savepoint (which can produce some records to process
+ * after the savepoint while stopping). If we interrupt source thread, we might leave the
+ * network stack in an inconsistent state. So, if we want to relay on the clean shutdown, we
+ * can not interrupt the source thread.
+ */
+ cancelTask(false);
+ }
+
+ private void cancelTask(boolean interrupt) {
try {
if (headOperator != null) {
headOperator.cancel();
}
} finally {
if (sourceThread.isAlive()) {
- sourceThread.interrupt();
+ if (interrupt) {
+ sourceThread.interrupt();
+ }
} else if (!sourceThread.getCompletionFuture().isDone()) {
// source thread didn't start
sourceThread.getCompletionFuture().complete(null);
@@ -188,12 +207,6 @@ public class SourceStreamTask<
}
@Override
- protected void finishTask() throws Exception {
- wasStoppedExternally = true;
- cancelTask();
- }
-
- @Override
protected CompletableFuture<Void> getCompletionFuture() {
return sourceThread.getCompletionFuture();
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 6b6677d..ebafdbf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -71,6 +72,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
@@ -573,6 +575,45 @@ public class SourceStreamTaskTest {
harness.waitForTaskCompletion(Long.MAX_VALUE, true);
}
+ @Test
+ public void testStopWithSavepointShouldNotInterruptTheSource() throws Exception {
+ long checkpointId = 1;
+ WasInterruptedTestingSource interruptedTestingSource = new WasInterruptedTestingSource();
+ try (StreamTaskMailboxTestHarness<String> harness =
+ new StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, STRING_TYPE_INFO)
+ .setupOutputForSingletonOperatorChain(
+ new StreamSource<>(interruptedTestingSource))
+ .build()) {
+
+ harness.processSingleStep();
+
+ Future<Boolean> triggerFuture =
+ harness.streamTask.triggerCheckpointAsync(
+ new CheckpointMetaData(checkpointId, 1),
+ new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
+ false);
+ while (!triggerFuture.isDone()) {
+ harness.streamTask.runMailboxStep();
+ }
+ triggerFuture.get();
+
+ Future<Void> notifyFuture =
+ harness.streamTask.notifyCheckpointCompleteAsync(checkpointId);
+ while (!notifyFuture.isDone()) {
+ harness.streamTask.runMailboxStep();
+ }
+ notifyFuture.get();
+
+ WasInterruptedTestingSource.allowExit();
+
+ harness.waitForTaskCompletion();
+ harness.finishProcessing();
+
+ assertTrue(notifyFuture.isDone());
+ assertFalse(interruptedTestingSource.wasInterrupted());
+ }
+ }
+
private static class MockSource
implements SourceFunction<Tuple2<Long, Integer>>, ListCheckpointed<Serializable> {
private static final long serialVersionUID = 1;
@@ -858,4 +899,44 @@ public class SourceStreamTaskTest {
output.collect(new StreamRecord<>(record));
}
}
+
+ /**
+ * This source sleeps a little bit before processing cancellation and records whether it was
+ * interrupted by the {@link SourceStreamTask} or not.
+ */
+ private static class WasInterruptedTestingSource implements SourceFunction<String> {
+ private static final long serialVersionUID = 1L;
+
+ private static final OneShotLatch ALLOW_EXIT = new OneShotLatch();
+ private static final AtomicBoolean WAS_INTERRUPTED = new AtomicBoolean();
+
+ private volatile boolean running = true;
+
+ @Override
+ public void run(SourceContext<String> ctx) throws Exception {
+ ALLOW_EXIT.reset();
+ WAS_INTERRUPTED.set(false);
+
+ try {
+ while (running || !ALLOW_EXIT.isTriggered()) {
+ Thread.sleep(1);
+ }
+ } catch (InterruptedException e) {
+ WAS_INTERRUPTED.set(true);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ public static boolean wasInterrupted() {
+ return WAS_INTERRUPTED.get();
+ }
+
+ public static void allowExit() {
+ ALLOW_EXIT.trigger();
+ }
+ }
}