You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2022/03/29 19:43:44 UTC
[flink] 02/03: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8a73e94542f8992b5f6dd268c18f0c8d3bdb6c6a
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Thu Mar 17 23:08:46 2022 +0100
[FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.
This change preserves the CheckpointOptions and properly integrates user-triggered snapshots and workflows with more than one source.
The externally induced source now merely delays the barrier instead of being able to insert one at a whim which would never work in aforementioned setups.
---
.../io/StreamTaskExternallyInducedSourceInput.java | 24 +++
.../runtime/tasks/SourceOperatorStreamTask.java | 187 +++++++++++++++++----
.../tasks/SourceOperatorStreamTaskTest.java | 79 ++++++++-
3 files changed, 254 insertions(+), 36 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskExternallyInducedSourceInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskExternallyInducedSourceInput.java
index fff008c..ca0462e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskExternallyInducedSourceInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskExternallyInducedSourceInput.java
@@ -21,12 +21,14 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
import org.apache.flink.streaming.api.operators.SourceOperator;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/** A subclass of {@link StreamTaskSourceInput} for {@link ExternallyInducedSourceReader}. */
public class StreamTaskExternallyInducedSourceInput<T> extends StreamTaskSourceInput<T> {
private final Consumer<Long> checkpointTriggeringHook;
private final ExternallyInducedSourceReader<T, ?> sourceReader;
+ private CompletableFuture<?> blockFuture;
@SuppressWarnings("unchecked")
public StreamTaskExternallyInducedSourceInput(
@@ -39,12 +41,34 @@ public class StreamTaskExternallyInducedSourceInput<T> extends StreamTaskSourceI
this.sourceReader = (ExternallyInducedSourceReader<T, ?>) operator.getSourceReader();
}
+ public void blockUntil(CompletableFuture<?> blockFuture) {
+ this.blockFuture = blockFuture;
+ // assume that the future is completed in mailbox thread
+ blockFuture.whenComplete((v, e) -> unblock());
+ }
+
+ private void unblock() {
+ this.blockFuture = null;
+ }
+
@Override
public DataInputStatus emitNext(DataOutput<T> output) throws Exception {
+ if (blockFuture != null) {
+ return DataInputStatus.NOTHING_AVAILABLE;
+ }
+
DataInputStatus status = super.emitNext(output);
if (status == DataInputStatus.NOTHING_AVAILABLE) {
sourceReader.shouldTriggerCheckpoint().ifPresent(checkpointTriggeringHook);
}
return status;
}
+
+ @Override
+ public CompletableFuture<?> getAvailableFuture() {
+ if (blockFuture != null) {
+ return blockFuture;
+ }
+ return super.getAvailableFuture();
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index efc594b..3d28055 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -24,14 +24,12 @@ import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
-import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -48,7 +46,12 @@ import org.apache.flink.util.concurrent.FutureUtils;
import javax.annotation.Nullable;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -57,7 +60,25 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, ?>> {
private AsyncDataOutputToOutput<T> output;
- private boolean isExternallyInducedSource;
+ /**
+ * Contains information about all checkpoints where RPC from checkpoint coordinator arrives
+ * before the source reader triggers it. (Common case)
+ */
+ private SortedMap<Long, UntriggeredCheckpoint> untriggeredCheckpoints = new TreeMap<>();
+ /**
+ * Contains the checkpoints that are triggered by the source but the RPC from checkpoint
+ * coordinator has yet to arrive. This may happen if the barrier is inserted as an event into
+ * the data plane by the source coordinator and the (distributed) source reader reads that event
+ * before receiving Flink's checkpoint RPC. (Rare case)
+ */
+ private SortedSet<Long> triggeredCheckpoints = new TreeSet<>();
+ /**
+ * Blocks input until the RPC call has been received that corresponds to the triggered
+ * checkpoint. This future must only be accessed and completed in the mailbox thread.
+ */
+ private CompletableFuture<Void> waitForRPC = FutureUtils.completedVoidFuture();
+ /** Only set for externally induced sources. See also {@link #isExternallyInducedSource()}. */
+ private StreamTaskExternallyInducedSourceInput<T> externallyInducedSourceInput;
public SourceOperatorStreamTask(Environment env) throws Exception {
super(env);
@@ -79,14 +100,14 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
if (operatorChain.isTaskDeployedAsFinished()) {
input = new StreamTaskFinishedOnRestoreSourceInput<>(sourceOperator, 0, 0);
} else if (sourceReader instanceof ExternallyInducedSourceReader) {
- isExternallyInducedSource = true;
-
- input =
+ externallyInducedSourceInput =
new StreamTaskExternallyInducedSourceInput<>(
sourceOperator,
this::triggerCheckpointForExternallyInducedSource,
0,
0);
+
+ input = externallyInducedSourceInput;
} else {
input = new StreamTaskSourceInput<>(sourceOperator, 0, 0);
}
@@ -112,20 +133,53 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
@Override
public CompletableFuture<Boolean> triggerCheckpointAsync(
CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
- if (!isExternallyInducedSource) {
- if (isSynchronous(checkpointOptions.getCheckpointType())) {
- return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
- } else {
- return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
- }
- } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
- // see FLINK-25256
- throw new IllegalStateException(
- "Using externally induced sources, we can not enforce taking a full checkpoint."
- + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
- + " either CLAIM or LEGACY mode.");
+ if (!isExternallyInducedSource()) {
+ return triggerCheckpointNowAsync(checkpointMetaData, checkpointOptions);
+ }
+ CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+ // immediately move RPC to mailbox so we don't need to synchronize fields
+ mainMailboxExecutor.execute(
+ () ->
+ triggerCheckpointOnExternallyInducedSource(
+ checkpointMetaData, checkpointOptions, triggerFuture),
+ "SourceOperatorStreamTask#triggerCheckpointAsync(%s, %s)",
+ checkpointMetaData,
+ checkpointOptions);
+ return triggerFuture;
+ }
+
+ private boolean isExternallyInducedSource() {
+ return externallyInducedSourceInput != null;
+ }
+
+ private void triggerCheckpointOnExternallyInducedSource(
+ CheckpointMetaData checkpointMetaData,
+ CheckpointOptions checkpointOptions,
+ CompletableFuture<Boolean> triggerFuture) {
+ assert (mailboxProcessor.isMailboxThread());
+ if (!triggeredCheckpoints.remove(checkpointMetaData.getCheckpointId())) {
+ // common case: RPC is received before source reader triggers checkpoint
+ // store metadata and options for later
+ untriggeredCheckpoints.put(
+ checkpointMetaData.getCheckpointId(),
+ new UntriggeredCheckpoint(checkpointMetaData, checkpointOptions));
+ triggerFuture.complete(isRunning());
+ } else {
+ // trigger already received (rare case)
+ FutureUtils.forward(
+ triggerCheckpointNowAsync(checkpointMetaData, checkpointOptions),
+ triggerFuture);
+
+ cleanupOldCheckpoints(checkpointMetaData.getCheckpointId());
+ }
+ }
+
+ private CompletableFuture<Boolean> triggerCheckpointNowAsync(
+ CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
+ if (isSynchronous(checkpointOptions.getCheckpointType())) {
+ return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
} else {
- return CompletableFuture.completedFuture(isRunning());
+ return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
}
}
@@ -159,22 +213,76 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
output.emitWatermark(Watermark.MAX_WATERMARK);
}
+ @Override
+ protected void declineCheckpoint(long checkpointId) {
+ cleanupCheckpoint(checkpointId);
+ super.declineCheckpoint(checkpointId);
+ }
+
+ @Override
+ public Future<Void> notifyCheckpointAbortAsync(
+ long checkpointId, long latestCompletedCheckpointId) {
+ mainMailboxExecutor.execute(
+ () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);
+ return super.notifyCheckpointAbortAsync(checkpointId, latestCompletedCheckpointId);
+ }
+
+ @Override
+ public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId) {
+ mainMailboxExecutor.execute(
+ () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);
+ return super.notifyCheckpointSubsumedAsync(checkpointId);
+ }
+
// --------------------------
private void triggerCheckpointForExternallyInducedSource(long checkpointId) {
- final CheckpointOptions checkpointOptions =
- CheckpointOptions.forConfig(
- CheckpointType.CHECKPOINT,
- CheckpointStorageLocationReference.getDefault(),
- configuration.isExactlyOnceCheckpointMode(),
- configuration.isUnalignedCheckpointsEnabled(),
- configuration.getAlignedCheckpointTimeout().toMillis());
- final long timestamp = System.currentTimeMillis();
+ UntriggeredCheckpoint untriggeredCheckpoint = untriggeredCheckpoints.remove(checkpointId);
+ if (untriggeredCheckpoint != null) {
+ // common case: RPC before external sources induces it
+ triggerCheckpointNowAsync(
+ untriggeredCheckpoint.getMetadata(),
+ untriggeredCheckpoint.getCheckpointOptions());
+ cleanupOldCheckpoints(checkpointId);
+ } else {
+ // rare case: external source induced first
+ triggeredCheckpoints.add(checkpointId);
+ if (waitForRPC.isDone()) {
+ waitForRPC = new CompletableFuture<>();
+ externallyInducedSourceInput.blockUntil(waitForRPC);
+ }
+ }
+ }
+
+ /**
+ * Cleanup any orphaned checkpoint before the given currently triggered checkpoint. These
+ * checkpoint may occur when the checkpoint is cancelled but the RPC is lost. Note, to be safe,
+ * checkpoint X is only removed when both RPC and trigger for a checkpoint Y>X is received.
+ */
+ private void cleanupOldCheckpoints(long checkpointId) {
+ assert (mailboxProcessor.isMailboxThread());
+ triggeredCheckpoints.headSet(checkpointId).clear();
+ untriggeredCheckpoints.headMap(checkpointId).clear();
+
+ maybeResumeProcessing();
+ }
+
+ /** Resumes processing if it was blocked before or else is a no-op. */
+ private void maybeResumeProcessing() {
+ assert (mailboxProcessor.isMailboxThread());
- final CheckpointMetaData checkpointMetaData =
- new CheckpointMetaData(checkpointId, timestamp, timestamp);
+ if (triggeredCheckpoints.isEmpty()) {
+ waitForRPC.complete(null);
+ }
+ }
+
+ /** Remove temporary data about a canceled checkpoint. */
+ private void cleanupCheckpoint(long checkpointId) {
+ assert (mailboxProcessor.isMailboxThread());
+ triggeredCheckpoints.remove(checkpointId);
+ untriggeredCheckpoints.remove(checkpointId);
- super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
+ maybeResumeProcessing();
}
// ---------------------------
@@ -225,4 +333,23 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
output.emitWatermarkStatus(watermarkStatus);
}
}
+
+ private static class UntriggeredCheckpoint {
+ private final CheckpointMetaData metadata;
+ private final CheckpointOptions checkpointOptions;
+
+ private UntriggeredCheckpoint(
+ CheckpointMetaData metadata, CheckpointOptions checkpointOptions) {
+ this.metadata = metadata;
+ this.checkpointOptions = checkpointOptions;
+ }
+
+ public CheckpointMetaData getMetadata() {
+ return metadata;
+ }
+
+ public CheckpointOptions getCheckpointOptions() {
+ return checkpointOptions;
+ }
+ }
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index 0ea212a..d3b9f64 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -61,6 +62,8 @@ import org.apache.flink.util.SerializedValue;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.io.Serializable;
@@ -74,6 +77,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import java.util.stream.IntStream;
+import java.util.stream.Stream;
import static org.apache.flink.streaming.util.TestHarnessUtil.assertOutputEquals;
import static org.assertj.core.api.Assertions.assertThat;
@@ -86,6 +90,10 @@ class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
private static final OperatorID OPERATOR_ID = new OperatorID();
private static final int NUM_RECORDS = 10;
+ public static final CheckpointStorageLocationReference SAVEPOINT_LOCATION =
+ new CheckpointStorageLocationReference("Savepoint".getBytes());
+ public static final CheckpointStorageLocationReference CHECKPOINT_LOCATION =
+ new CheckpointStorageLocationReference("Checkpoint".getBytes());
@Test
void testMetrics() throws Exception {
@@ -157,8 +165,35 @@ class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
}
}
- @Test
- void testExternallyInducedSource() throws Exception {
+ static Stream<?> provideExternallyInducedParameters() {
+ return Stream.of(
+ CheckpointOptions.alignedNoTimeout(
+ SavepointType.savepoint(SavepointFormatType.CANONICAL),
+ SAVEPOINT_LOCATION),
+ CheckpointOptions.alignedNoTimeout(
+ SavepointType.terminate(SavepointFormatType.CANONICAL),
+ SAVEPOINT_LOCATION),
+ CheckpointOptions.alignedNoTimeout(
+ SavepointType.suspend(SavepointFormatType.CANONICAL),
+ SAVEPOINT_LOCATION),
+ CheckpointOptions.alignedNoTimeout(
+ CheckpointType.CHECKPOINT, CHECKPOINT_LOCATION),
+ CheckpointOptions.alignedWithTimeout(
+ CheckpointType.CHECKPOINT, CHECKPOINT_LOCATION, 123L),
+ CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, CHECKPOINT_LOCATION),
+ CheckpointOptions.notExactlyOnce(
+ CheckpointType.CHECKPOINT, CHECKPOINT_LOCATION))
+ .flatMap(
+ options ->
+ Stream.of(
+ new Object[] {options, true},
+ new Object[] {options, false}));
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideExternallyInducedParameters")
+ void testExternallyInducedSource(CheckpointOptions checkpointOptions, boolean rpcFirst)
+ throws Exception {
final int numEventsBeforeCheckpoint = 10;
final int totalNumEvents = 20;
TestingExternallyInducedSourceReader testingReader =
@@ -170,15 +205,47 @@ class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
((SourceOperator) testHarness.getStreamTask().mainOperator)
.getSourceReader();
- testHarness.processAll();
+ CheckpointMetaData checkpointMetaData =
+ new CheckpointMetaData(TestingExternallyInducedSourceReader.CHECKPOINT_ID, 2);
+ if (rpcFirst) {
+ testHarness.streamTask.triggerCheckpointAsync(
+ checkpointMetaData, checkpointOptions);
+ testHarness.processAll();
+ } else {
+ do {
+ testHarness.processSingleStep();
+ } while (!runtimeTestingReader.shouldTriggerCheckpoint().isPresent());
+ // stream task should block when trigger received but no RPC
+ assertThat(testHarness.streamTask.inputProcessor.isAvailable()).isFalse();
+ CompletableFuture<Boolean> triggerCheckpointAsync =
+ testHarness.streamTask.triggerCheckpointAsync(
+ checkpointMetaData, checkpointOptions);
+ // process mails until checkpoint has been processed
+ while (!triggerCheckpointAsync.isDone()) {
+ testHarness.processSingleStep();
+ }
+ // stream task should be unblocked now
+ assertThat(testHarness.streamTask.inputProcessor.isAvailable()).isTrue();
+ testHarness.processAll();
+ }
- assertThat(runtimeTestingReader.numEmittedEvents).isEqualTo(totalNumEvents);
+ int expectedEvents =
+ checkpointOptions.getCheckpointType().isSavepoint()
+ && ((SavepointType) checkpointOptions.getCheckpointType())
+ .isSynchronous()
+ ? numEventsBeforeCheckpoint
+ : totalNumEvents;
+ assertThat(runtimeTestingReader.numEmittedEvents).isEqualTo(expectedEvents);
assertThat(runtimeTestingReader.checkpointed).isTrue();
assertThat(runtimeTestingReader.checkpointedId)
.isEqualTo(TestingExternallyInducedSourceReader.CHECKPOINT_ID);
assertThat(runtimeTestingReader.checkpointedAt).isEqualTo(numEventsBeforeCheckpoint);
Assertions.assertThat(testHarness.getOutput())
- .contains(new CheckpointBarrier(2, 2, checkpointOptions));
+ .contains(
+ new CheckpointBarrier(
+ checkpointMetaData.getCheckpointId(),
+ checkpointMetaData.getTimestamp(),
+ checkpointOptions));
}
}
@@ -262,7 +329,7 @@ class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
new CheckpointMetaData(2, 2),
CheckpointOptions.alignedNoTimeout(
SavepointType.terminate(SavepointFormatType.CANONICAL),
- CheckpointStorageLocationReference.getDefault()));
+ SAVEPOINT_LOCATION));
checkpointCompleted.whenComplete(
(ignored, exception) ->
testHarness.streamTask.notifyCheckpointCompleteAsync(2));