You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2022/03/29 19:43:44 UTC

[flink] 02/03: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8a73e94542f8992b5f6dd268c18f0c8d3bdb6c6a
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Thu Mar 17 23:08:46 2022 +0100

    [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.
    
    This change preserves the CheckpointOptions and properly integrates user-triggered snapshots and workflows with more than one source.
    The externally induced source now merely delays the barrier instead of being able to insert one at a whim which would never work in aforementioned setups.
---
 .../io/StreamTaskExternallyInducedSourceInput.java |  24 +++
 .../runtime/tasks/SourceOperatorStreamTask.java    | 187 +++++++++++++++++----
 .../tasks/SourceOperatorStreamTaskTest.java        |  79 ++++++++-
 3 files changed, 254 insertions(+), 36 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskExternallyInducedSourceInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskExternallyInducedSourceInput.java
index fff008c..ca0462e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskExternallyInducedSourceInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskExternallyInducedSourceInput.java
@@ -21,12 +21,14 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
 import org.apache.flink.streaming.api.operators.SourceOperator;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
 /** A subclass of {@link StreamTaskSourceInput} for {@link ExternallyInducedSourceReader}. */
 public class StreamTaskExternallyInducedSourceInput<T> extends StreamTaskSourceInput<T> {
     private final Consumer<Long> checkpointTriggeringHook;
     private final ExternallyInducedSourceReader<T, ?> sourceReader;
+    private CompletableFuture<?> blockFuture;
 
     @SuppressWarnings("unchecked")
     public StreamTaskExternallyInducedSourceInput(
@@ -39,12 +41,34 @@ public class StreamTaskExternallyInducedSourceInput<T> extends StreamTaskSourceI
         this.sourceReader = (ExternallyInducedSourceReader<T, ?>) operator.getSourceReader();
     }
 
+    public void blockUntil(CompletableFuture<?> blockFuture) {
+        this.blockFuture = blockFuture;
+        // assume that the future is completed in mailbox thread
+        blockFuture.whenComplete((v, e) -> unblock());
+    }
+
+    private void unblock() {
+        this.blockFuture = null;
+    }
+
     @Override
     public DataInputStatus emitNext(DataOutput<T> output) throws Exception {
+        if (blockFuture != null) {
+            return DataInputStatus.NOTHING_AVAILABLE;
+        }
+
         DataInputStatus status = super.emitNext(output);
         if (status == DataInputStatus.NOTHING_AVAILABLE) {
             sourceReader.shouldTriggerCheckpoint().ifPresent(checkpointTriggeringHook);
         }
         return status;
     }
+
+    @Override
+    public CompletableFuture<?> getAvailableFuture() {
+        if (blockFuture != null) {
+            return blockFuture;
+        }
+        return super.getAvailableFuture();
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index efc594b..3d28055 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -24,14 +24,12 @@ import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.SavepointType;
 import org.apache.flink.runtime.checkpoint.SnapshotType;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.StopMode;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
-import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.SourceOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -48,7 +46,12 @@ import org.apache.flink.util.concurrent.FutureUtils;
 
 import javax.annotation.Nullable;
 
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -57,7 +60,25 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, ?>> {
 
     private AsyncDataOutputToOutput<T> output;
-    private boolean isExternallyInducedSource;
+    /**
+     * Contains information about all checkpoints where RPC from checkpoint coordinator arrives
+     * before the source reader triggers it. (Common case)
+     */
+    private SortedMap<Long, UntriggeredCheckpoint> untriggeredCheckpoints = new TreeMap<>();
+    /**
+     * Contains the checkpoints that are triggered by the source but the RPC from checkpoint
+     * coordinator has yet to arrive. This may happen if the barrier is inserted as an event into
+     * the data plane by the source coordinator and the (distributed) source reader reads that event
+     * before receiving Flink's checkpoint RPC. (Rare case)
+     */
+    private SortedSet<Long> triggeredCheckpoints = new TreeSet<>();
+    /**
+     * Blocks input until the RPC call has been received that corresponds to the triggered
+     * checkpoint. This future must only be accessed and completed in the mailbox thread.
+     */
+    private CompletableFuture<Void> waitForRPC = FutureUtils.completedVoidFuture();
+    /** Only set for externally induced sources. See also {@link #isExternallyInducedSource()}. */
+    private StreamTaskExternallyInducedSourceInput<T> externallyInducedSourceInput;
 
     public SourceOperatorStreamTask(Environment env) throws Exception {
         super(env);
@@ -79,14 +100,14 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
         if (operatorChain.isTaskDeployedAsFinished()) {
             input = new StreamTaskFinishedOnRestoreSourceInput<>(sourceOperator, 0, 0);
         } else if (sourceReader instanceof ExternallyInducedSourceReader) {
-            isExternallyInducedSource = true;
-
-            input =
+            externallyInducedSourceInput =
                     new StreamTaskExternallyInducedSourceInput<>(
                             sourceOperator,
                             this::triggerCheckpointForExternallyInducedSource,
                             0,
                             0);
+
+            input = externallyInducedSourceInput;
         } else {
             input = new StreamTaskSourceInput<>(sourceOperator, 0, 0);
         }
@@ -112,20 +133,53 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
-            }
-        } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce taking a full checkpoint."
-                            + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
+        if (!isExternallyInducedSource()) {
+            return triggerCheckpointNowAsync(checkpointMetaData, checkpointOptions);
+        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize fields
+        mainMailboxExecutor.execute(
+                () ->
+                        triggerCheckpointOnExternallyInducedSource(
+                                checkpointMetaData, checkpointOptions, triggerFuture),
+                "SourceOperatorStreamTask#triggerCheckpointAsync(%s, %s)",
+                checkpointMetaData,
+                checkpointOptions);
+        return triggerFuture;
+    }
+
+    private boolean isExternallyInducedSource() {
+        return externallyInducedSourceInput != null;
+    }
+
+    private void triggerCheckpointOnExternallyInducedSource(
+            CheckpointMetaData checkpointMetaData,
+            CheckpointOptions checkpointOptions,
+            CompletableFuture<Boolean> triggerFuture) {
+        assert (mailboxProcessor.isMailboxThread());
+        if (!triggeredCheckpoints.remove(checkpointMetaData.getCheckpointId())) {
+            // common case: RPC is received before source reader triggers checkpoint
+            // store metadata and options for later
+            untriggeredCheckpoints.put(
+                    checkpointMetaData.getCheckpointId(),
+                    new UntriggeredCheckpoint(checkpointMetaData, checkpointOptions));
+            triggerFuture.complete(isRunning());
+        } else {
+            // trigger already received (rare case)
+            FutureUtils.forward(
+                    triggerCheckpointNowAsync(checkpointMetaData, checkpointOptions),
+                    triggerFuture);
+
+            cleanupOldCheckpoints(checkpointMetaData.getCheckpointId());
+        }
+    }
+
+    private CompletableFuture<Boolean> triggerCheckpointNowAsync(
+            CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
+        if (isSynchronous(checkpointOptions.getCheckpointType())) {
+            return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
         } else {
-            return CompletableFuture.completedFuture(isRunning());
+            return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
         }
     }
 
@@ -159,22 +213,76 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
         output.emitWatermark(Watermark.MAX_WATERMARK);
     }
 
+    @Override
+    protected void declineCheckpoint(long checkpointId) {
+        cleanupCheckpoint(checkpointId);
+        super.declineCheckpoint(checkpointId);
+    }
+
+    @Override
+    public Future<Void> notifyCheckpointAbortAsync(
+            long checkpointId, long latestCompletedCheckpointId) {
+        mainMailboxExecutor.execute(
+                () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);
+        return super.notifyCheckpointAbortAsync(checkpointId, latestCompletedCheckpointId);
+    }
+
+    @Override
+    public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId) {
+        mainMailboxExecutor.execute(
+                () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);
+        return super.notifyCheckpointSubsumedAsync(checkpointId);
+    }
+
     // --------------------------
 
     private void triggerCheckpointForExternallyInducedSource(long checkpointId) {
-        final CheckpointOptions checkpointOptions =
-                CheckpointOptions.forConfig(
-                        CheckpointType.CHECKPOINT,
-                        CheckpointStorageLocationReference.getDefault(),
-                        configuration.isExactlyOnceCheckpointMode(),
-                        configuration.isUnalignedCheckpointsEnabled(),
-                        configuration.getAlignedCheckpointTimeout().toMillis());
-        final long timestamp = System.currentTimeMillis();
+        UntriggeredCheckpoint untriggeredCheckpoint = untriggeredCheckpoints.remove(checkpointId);
+        if (untriggeredCheckpoint != null) {
+            // common case: RPC before external sources induces it
+            triggerCheckpointNowAsync(
+                    untriggeredCheckpoint.getMetadata(),
+                    untriggeredCheckpoint.getCheckpointOptions());
+            cleanupOldCheckpoints(checkpointId);
+        } else {
+            // rare case: external source induced first
+            triggeredCheckpoints.add(checkpointId);
+            if (waitForRPC.isDone()) {
+                waitForRPC = new CompletableFuture<>();
+                externallyInducedSourceInput.blockUntil(waitForRPC);
+            }
+        }
+    }
+
+    /**
+     * Cleanup any orphaned checkpoint before the given currently triggered checkpoint. These
+     * checkpoint may occur when the checkpoint is cancelled but the RPC is lost. Note, to be safe,
+     * checkpoint X is only removed when both RPC and trigger for a checkpoint Y>X is received.
+     */
+    private void cleanupOldCheckpoints(long checkpointId) {
+        assert (mailboxProcessor.isMailboxThread());
+        triggeredCheckpoints.headSet(checkpointId).clear();
+        untriggeredCheckpoints.headMap(checkpointId).clear();
+
+        maybeResumeProcessing();
+    }
+
+    /** Resumes processing if it was blocked before or else is a no-op. */
+    private void maybeResumeProcessing() {
+        assert (mailboxProcessor.isMailboxThread());
 
-        final CheckpointMetaData checkpointMetaData =
-                new CheckpointMetaData(checkpointId, timestamp, timestamp);
+        if (triggeredCheckpoints.isEmpty()) {
+            waitForRPC.complete(null);
+        }
+    }
+
+    /** Remove temporary data about a canceled checkpoint. */
+    private void cleanupCheckpoint(long checkpointId) {
+        assert (mailboxProcessor.isMailboxThread());
+        triggeredCheckpoints.remove(checkpointId);
+        untriggeredCheckpoints.remove(checkpointId);
 
-        super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
+        maybeResumeProcessing();
     }
 
     // ---------------------------
@@ -225,4 +333,23 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
             output.emitWatermarkStatus(watermarkStatus);
         }
     }
+
+    private static class UntriggeredCheckpoint {
+        private final CheckpointMetaData metadata;
+        private final CheckpointOptions checkpointOptions;
+
+        private UntriggeredCheckpoint(
+                CheckpointMetaData metadata, CheckpointOptions checkpointOptions) {
+            this.metadata = metadata;
+            this.checkpointOptions = checkpointOptions;
+        }
+
+        public CheckpointMetaData getMetadata() {
+            return metadata;
+        }
+
+        public CheckpointOptions getCheckpointOptions() {
+            return checkpointOptions;
+        }
+    }
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index 0ea212a..d3b9f64 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.SavepointType;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -61,6 +62,8 @@ import org.apache.flink.util.SerializedValue;
 
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -74,6 +77,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.function.Supplier;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import static org.apache.flink.streaming.util.TestHarnessUtil.assertOutputEquals;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -86,6 +90,10 @@ class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
 
     private static final OperatorID OPERATOR_ID = new OperatorID();
     private static final int NUM_RECORDS = 10;
+    public static final CheckpointStorageLocationReference SAVEPOINT_LOCATION =
+            new CheckpointStorageLocationReference("Savepoint".getBytes());
+    public static final CheckpointStorageLocationReference CHECKPOINT_LOCATION =
+            new CheckpointStorageLocationReference("Checkpoint".getBytes());
 
     @Test
     void testMetrics() throws Exception {
@@ -157,8 +165,35 @@ class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
         }
     }
 
-    @Test
-    void testExternallyInducedSource() throws Exception {
+    static Stream<?> provideExternallyInducedParameters() {
+        return Stream.of(
+                        CheckpointOptions.alignedNoTimeout(
+                                SavepointType.savepoint(SavepointFormatType.CANONICAL),
+                                SAVEPOINT_LOCATION),
+                        CheckpointOptions.alignedNoTimeout(
+                                SavepointType.terminate(SavepointFormatType.CANONICAL),
+                                SAVEPOINT_LOCATION),
+                        CheckpointOptions.alignedNoTimeout(
+                                SavepointType.suspend(SavepointFormatType.CANONICAL),
+                                SAVEPOINT_LOCATION),
+                        CheckpointOptions.alignedNoTimeout(
+                                CheckpointType.CHECKPOINT, CHECKPOINT_LOCATION),
+                        CheckpointOptions.alignedWithTimeout(
+                                CheckpointType.CHECKPOINT, CHECKPOINT_LOCATION, 123L),
+                        CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, CHECKPOINT_LOCATION),
+                        CheckpointOptions.notExactlyOnce(
+                                CheckpointType.CHECKPOINT, CHECKPOINT_LOCATION))
+                .flatMap(
+                        options ->
+                                Stream.of(
+                                        new Object[] {options, true},
+                                        new Object[] {options, false}));
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideExternallyInducedParameters")
+    void testExternallyInducedSource(CheckpointOptions checkpointOptions, boolean rpcFirst)
+            throws Exception {
         final int numEventsBeforeCheckpoint = 10;
         final int totalNumEvents = 20;
         TestingExternallyInducedSourceReader testingReader =
@@ -170,15 +205,47 @@ class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
                             ((SourceOperator) testHarness.getStreamTask().mainOperator)
                                     .getSourceReader();
 
-            testHarness.processAll();
+            CheckpointMetaData checkpointMetaData =
+                    new CheckpointMetaData(TestingExternallyInducedSourceReader.CHECKPOINT_ID, 2);
+            if (rpcFirst) {
+                testHarness.streamTask.triggerCheckpointAsync(
+                        checkpointMetaData, checkpointOptions);
+                testHarness.processAll();
+            } else {
+                do {
+                    testHarness.processSingleStep();
+                } while (!runtimeTestingReader.shouldTriggerCheckpoint().isPresent());
+                // stream task should block when trigger received but no RPC
+                assertThat(testHarness.streamTask.inputProcessor.isAvailable()).isFalse();
+                CompletableFuture<Boolean> triggerCheckpointAsync =
+                        testHarness.streamTask.triggerCheckpointAsync(
+                                checkpointMetaData, checkpointOptions);
+                // process mails until checkpoint has been processed
+                while (!triggerCheckpointAsync.isDone()) {
+                    testHarness.processSingleStep();
+                }
+                // stream task should be unblocked now
+                assertThat(testHarness.streamTask.inputProcessor.isAvailable()).isTrue();
+                testHarness.processAll();
+            }
 
-            assertThat(runtimeTestingReader.numEmittedEvents).isEqualTo(totalNumEvents);
+            int expectedEvents =
+                    checkpointOptions.getCheckpointType().isSavepoint()
+                                    && ((SavepointType) checkpointOptions.getCheckpointType())
+                                            .isSynchronous()
+                            ? numEventsBeforeCheckpoint
+                            : totalNumEvents;
+            assertThat(runtimeTestingReader.numEmittedEvents).isEqualTo(expectedEvents);
             assertThat(runtimeTestingReader.checkpointed).isTrue();
             assertThat(runtimeTestingReader.checkpointedId)
                     .isEqualTo(TestingExternallyInducedSourceReader.CHECKPOINT_ID);
             assertThat(runtimeTestingReader.checkpointedAt).isEqualTo(numEventsBeforeCheckpoint);
             Assertions.assertThat(testHarness.getOutput())
-                    .contains(new CheckpointBarrier(2, 2, checkpointOptions));
+                    .contains(
+                            new CheckpointBarrier(
+                                    checkpointMetaData.getCheckpointId(),
+                                    checkpointMetaData.getTimestamp(),
+                                    checkpointOptions));
         }
     }
 
@@ -262,7 +329,7 @@ class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
                             new CheckpointMetaData(2, 2),
                             CheckpointOptions.alignedNoTimeout(
                                     SavepointType.terminate(SavepointFormatType.CANONICAL),
-                                    CheckpointStorageLocationReference.getDefault()));
+                                    SAVEPOINT_LOCATION));
             checkpointCompleted.whenComplete(
                     (ignored, exception) ->
                             testHarness.streamTask.notifyCheckpointCompleteAsync(2));