You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/23 17:47:29 UTC

[flink] branch release-1.11 updated: [FLINK-21028][task] Do not interrupt the source thread on stop with savepoint

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 463d9e3  [FLINK-21028][task] Do not interrupt the source thread on stop with savepoint
463d9e3 is described below

commit 463d9e3d32af81c6cd210847b7c0068774221224
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Feb 19 17:47:40 2021 +0100

    [FLINK-21028][task] Do not interrupt the source thread on stop with savepoint
    
    Currently stop with savepoint relies on the EndOfPartitionEvents propagation and performs
    clean shutdown after the stop with savepoint (which can produce some records to process after
    the savepoint while stopping). If we interrupt source thread, we might leave the newtork stack
    in an inconsitent state. So, if we want to relay on the clean shutdown, we can not interrupt
    the source thread.
---
 .../streaming/runtime/tasks/SourceStreamTask.java  | 27 ++++++--
 .../runtime/tasks/SourceStreamTaskTest.java        | 81 ++++++++++++++++++++++
 2 files changed, 101 insertions(+), 7 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index f407e08..e749c52 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -173,13 +173,32 @@ public class SourceStreamTask<
 
     @Override
     protected void cancelTask() {
+        cancelTask(true);
+    }
+
+    @Override
+    protected void finishTask() {
+        wasStoppedExternally = true;
+        /**
+         * Currently stop with savepoint relies on the EndOfPartitionEvents propagation and performs
+         * clean shutdown after the stop with savepoint (which can produce some records to process
+         * after the savepoint while stopping). If we interrupt source thread, we might leave the
+         * network stack in an inconsistent state. So, if we want to relay on the clean shutdown, we
+         * can not interrupt the source thread.
+         */
+        cancelTask(false);
+    }
+
+    private void cancelTask(boolean interrupt) {
         try {
             if (headOperator != null) {
                 headOperator.cancel();
             }
         } finally {
             if (sourceThread.isAlive()) {
-                sourceThread.interrupt();
+                if (interrupt) {
+                    sourceThread.interrupt();
+                }
             } else if (!sourceThread.getCompletionFuture().isDone()) {
                 // source thread didn't start
                 sourceThread.getCompletionFuture().complete(null);
@@ -188,12 +207,6 @@ public class SourceStreamTask<
     }
 
     @Override
-    protected void finishTask() throws Exception {
-        wasStoppedExternally = true;
-        cancelTask();
-    }
-
-    @Override
     protected CompletableFuture<Void> getCompletionFuture() {
         return sourceThread.getCompletionFuture();
     }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 6b6677d..ebafdbf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -71,6 +72,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
@@ -573,6 +575,45 @@ public class SourceStreamTaskTest {
         harness.waitForTaskCompletion(Long.MAX_VALUE, true);
     }
 
+    @Test
+    public void testStopWithSavepointShouldNotInterruptTheSource() throws Exception {
+        long checkpointId = 1;
+        WasInterruptedTestingSource interruptedTestingSource = new WasInterruptedTestingSource();
+        try (StreamTaskMailboxTestHarness<String> harness =
+                new StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, STRING_TYPE_INFO)
+                        .setupOutputForSingletonOperatorChain(
+                                new StreamSource<>(interruptedTestingSource))
+                        .build()) {
+
+            harness.processSingleStep();
+
+            Future<Boolean> triggerFuture =
+                    harness.streamTask.triggerCheckpointAsync(
+                            new CheckpointMetaData(checkpointId, 1),
+                            new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
+                            false);
+            while (!triggerFuture.isDone()) {
+                harness.streamTask.runMailboxStep();
+            }
+            triggerFuture.get();
+
+            Future<Void> notifyFuture =
+                    harness.streamTask.notifyCheckpointCompleteAsync(checkpointId);
+            while (!notifyFuture.isDone()) {
+                harness.streamTask.runMailboxStep();
+            }
+            notifyFuture.get();
+
+            WasInterruptedTestingSource.allowExit();
+
+            harness.waitForTaskCompletion();
+            harness.finishProcessing();
+
+            assertTrue(notifyFuture.isDone());
+            assertFalse(interruptedTestingSource.wasInterrupted());
+        }
+    }
+
     private static class MockSource
             implements SourceFunction<Tuple2<Long, Integer>>, ListCheckpointed<Serializable> {
         private static final long serialVersionUID = 1;
@@ -858,4 +899,44 @@ public class SourceStreamTaskTest {
             output.collect(new StreamRecord<>(record));
         }
     }
+
+    /**
+     * This source sleeps a little bit before processing cancellation and records whether it was
+     * interrupted by the {@link SourceStreamTask} or not.
+     */
+    private static class WasInterruptedTestingSource implements SourceFunction<String> {
+        private static final long serialVersionUID = 1L;
+
+        private static final OneShotLatch ALLOW_EXIT = new OneShotLatch();
+        private static final AtomicBoolean WAS_INTERRUPTED = new AtomicBoolean();
+
+        private volatile boolean running = true;
+
+        @Override
+        public void run(SourceContext<String> ctx) throws Exception {
+            ALLOW_EXIT.reset();
+            WAS_INTERRUPTED.set(false);
+
+            try {
+                while (running || !ALLOW_EXIT.isTriggered()) {
+                    Thread.sleep(1);
+                }
+            } catch (InterruptedException e) {
+                WAS_INTERRUPTED.set(true);
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+
+        public static boolean wasInterrupted() {
+            return WAS_INTERRUPTED.get();
+        }
+
+        public static void allowExit() {
+            ALLOW_EXIT.trigger();
+        }
+    }
 }