You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/23 09:47:10 UTC

[GitHub] [flink] crazyzhou commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

crazyzhou commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r833037741



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -159,22 +215,76 @@ protected void advanceToEndOfEventTime() {
         output.emitWatermark(Watermark.MAX_WATERMARK);
     }
 
+    @Override
+    protected void declineCheckpoint(long checkpointId) {
+        cleanupCheckpoint(checkpointId);
+        super.declineCheckpoint(checkpointId);
+    }
+
+    @Override
+    public Future<Void> notifyCheckpointAbortAsync(
+            long checkpointId, long latestCompletedCheckpointId) {
+        mainMailboxExecutor.execute(
+                () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);
+        return super.notifyCheckpointAbortAsync(checkpointId, latestCompletedCheckpointId);
+    }
+
+    @Override
+    public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId) {
+        mainMailboxExecutor.execute(
+                () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);
+        return super.notifyCheckpointSubsumedAsync(checkpointId);
+    }
+
     // --------------------------
 
     private void triggerCheckpointForExternallyInducedSource(long checkpointId) {
-        final CheckpointOptions checkpointOptions =
-                CheckpointOptions.forConfig(
-                        CheckpointType.CHECKPOINT,
-                        CheckpointStorageLocationReference.getDefault(),
-                        configuration.isExactlyOnceCheckpointMode(),
-                        configuration.isUnalignedCheckpointsEnabled(),
-                        configuration.getAlignedCheckpointTimeout().toMillis());
-        final long timestamp = System.currentTimeMillis();
+        UntriggeredCheckpoint untriggeredCheckpoint = untriggeredCheckpoints.remove(checkpointId);
+        if (untriggeredCheckpoint != null) {
+            cleanupOldCheckpoints(checkpointId);
+            // common case: RPC before external sources induces it
+            triggerCheckpointNowAsync(
+                    untriggeredCheckpoint.getMetadata(),
+                    untriggeredCheckpoint.getCheckpointOptions());
+        } else {
+            // rare case: external source induced first
+            triggeredCheckpoints.add(checkpointId);
+            if (waitForRPC.isDone()) {
+                waitForRPC = new CompletableFuture<>();
+                externallyInducedSourceInput.blockUntil(waitForRPC);
+            }
+        }
+    }
+
+    /**
+     * Cleanup any orphaned checkpoint before the given currently triggered checkpoint. These
+     * checkpoint may occur when the checkpoint is cancelled but the RPC is lost. Note, to be safe,
+     * checkpoint X is only removed when both RPC and trigger for a checkpoint Y>X is received.
+     */
+    private void cleanupOldCheckpoints(long checkpointId) {
+        assert (mailboxProcessor.isMailboxThread());
+        triggeredCheckpoints.headSet(checkpointId).clear();
+        untriggeredCheckpoints.headMap(checkpointId).clear();
+
+        maybeResumeProcessing();

Review comment:
       Do we need to call this here as we have it already called at last?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org