You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/17 22:09:49 UTC

[GitHub] [flink] AHeise opened a new pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

AHeise opened a new pull request #19138:
URL: https://github.com/apache/flink/pull/19138


   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
     - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact*
     - *Deployments RPC transmits only the blob storage reference*
     - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (100MB)*
     - *Extended integration test for recovery after master (JobManager) failure*
     - *Added test that validates that TaskInfo is transferred only once across recoveries*
     - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no)
     - The serializers: (yes / no / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / no / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
     - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bdda380493d6b1ebecf49ce9dfe6084a56eb99f4 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372",
       "triggerID" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 628208d91133e647a2abb952656274e9ff95a808 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341) 
   * d3bff46f55d08691473eaea920456474b54ef20e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bdda380493d6b1ebecf49ce9dfe6084a56eb99f4 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372",
       "triggerID" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33480",
       "triggerID" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33524",
       "triggerID" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33539",
       "triggerID" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e7d5817f8120ca9d09bba542879d8274141b644d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e7d5817f8120ca9d09bba542879d8274141b644d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 224cf1d167603784767b915329fadbd1ea24af9a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33539) 
   * e7d5817f8120ca9d09bba542879d8274141b644d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bdda380493d6b1ebecf49ce9dfe6084a56eb99f4 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272) 
   * 99ed2d64ea1b2741d8baff44b991b1b2ad03de3c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] crazyzhou commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
crazyzhou commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r833037741



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -159,22 +215,76 @@ protected void advanceToEndOfEventTime() {
         output.emitWatermark(Watermark.MAX_WATERMARK);
     }
 
+    @Override
+    protected void declineCheckpoint(long checkpointId) {
+        cleanupCheckpoint(checkpointId);
+        super.declineCheckpoint(checkpointId);
+    }
+
+    @Override
+    public Future<Void> notifyCheckpointAbortAsync(
+            long checkpointId, long latestCompletedCheckpointId) {
+        mainMailboxExecutor.execute(
+                () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);
+        return super.notifyCheckpointAbortAsync(checkpointId, latestCompletedCheckpointId);
+    }
+
+    @Override
+    public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId) {
+        mainMailboxExecutor.execute(
+                () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);
+        return super.notifyCheckpointSubsumedAsync(checkpointId);
+    }
+
     // --------------------------
 
     private void triggerCheckpointForExternallyInducedSource(long checkpointId) {
-        final CheckpointOptions checkpointOptions =
-                CheckpointOptions.forConfig(
-                        CheckpointType.CHECKPOINT,
-                        CheckpointStorageLocationReference.getDefault(),
-                        configuration.isExactlyOnceCheckpointMode(),
-                        configuration.isUnalignedCheckpointsEnabled(),
-                        configuration.getAlignedCheckpointTimeout().toMillis());
-        final long timestamp = System.currentTimeMillis();
+        UntriggeredCheckpoint untriggeredCheckpoint = untriggeredCheckpoints.remove(checkpointId);
+        if (untriggeredCheckpoint != null) {
+            cleanupOldCheckpoints(checkpointId);
+            // common case: RPC before external sources induces it
+            triggerCheckpointNowAsync(
+                    untriggeredCheckpoint.getMetadata(),
+                    untriggeredCheckpoint.getCheckpointOptions());
+        } else {
+            // rare case: external source induced first
+            triggeredCheckpoints.add(checkpointId);
+            if (waitForRPC.isDone()) {
+                waitForRPC = new CompletableFuture<>();
+                externallyInducedSourceInput.blockUntil(waitForRPC);
+            }
+        }
+    }
+
+    /**
+     * Cleanup any orphaned checkpoint before the given currently triggered checkpoint. These
+     * checkpoint may occur when the checkpoint is cancelled but the RPC is lost. Note, to be safe,
+     * checkpoint X is only removed when both RPC and trigger for a checkpoint Y>X is received.
+     */
+    private void cleanupOldCheckpoints(long checkpointId) {
+        assert (mailboxProcessor.isMailboxThread());
+        triggeredCheckpoints.headSet(checkpointId).clear();
+        untriggeredCheckpoints.headMap(checkpointId).clear();
+
+        maybeResumeProcessing();

Review comment:
       Do we need to call this here as we have it already called at last?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 628208d91133e647a2abb952656274e9ff95a808 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372",
       "triggerID" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33480",
       "triggerID" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33524",
       "triggerID" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33539",
       "triggerID" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a399eaee91353d104be43a8b8f9bc40068274f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33598",
       "triggerID" : "7a399eaee91353d104be43a8b8f9bc40068274f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e263a620aa70714d4958dd4dc47ad1b1b3866a8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33655",
       "triggerID" : "5e263a620aa70714d4958dd4dc47ad1b1b3866a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a64229d137a6c0f81f5bf758506ea21bd075c8da",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33885",
       "triggerID" : "a64229d137a6c0f81f5bf758506ea21bd075c8da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e263a620aa70714d4958dd4dc47ad1b1b3866a8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33655) 
   * a64229d137a6c0f81f5bf758506ea21bd075c8da Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33885) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 99ed2d64ea1b2741d8baff44b991b1b2ad03de3c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274) 
   * 61c748931fa19d8c1fefbee20e276d95b1687084 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372",
       "triggerID" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d3bff46f55d08691473eaea920456474b54ef20e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bdda380493d6b1ebecf49ce9dfe6084a56eb99f4 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372",
       "triggerID" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33480",
       "triggerID" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d3bff46f55d08691473eaea920456474b54ef20e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372) 
   * 94f2b52265123f5062c31b80d81dd0ef8fd24161 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33480) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372",
       "triggerID" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33480",
       "triggerID" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33524",
       "triggerID" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 94f2b52265123f5062c31b80d81dd0ef8fd24161 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33480) 
   * f993e8fec32cee34fac51d298b618bd4284a96ec Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33524) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372",
       "triggerID" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33480",
       "triggerID" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33524",
       "triggerID" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33539",
       "triggerID" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a399eaee91353d104be43a8b8f9bc40068274f1",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33598",
       "triggerID" : "7a399eaee91353d104be43a8b8f9bc40068274f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e263a620aa70714d4958dd4dc47ad1b1b3866a8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33655",
       "triggerID" : "5e263a620aa70714d4958dd4dc47ad1b1b3866a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7a399eaee91353d104be43a8b8f9bc40068274f1 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33598) 
   * 5e263a620aa70714d4958dd4dc47ad1b1b3866a8 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33655) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bdda380493d6b1ebecf49ce9dfe6084a56eb99f4 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272) 
   * 99ed2d64ea1b2741d8baff44b991b1b2ad03de3c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bdda380493d6b1ebecf49ce9dfe6084a56eb99f4 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272) 
   * 99ed2d64ea1b2741d8baff44b991b1b2ad03de3c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bdda380493d6b1ebecf49ce9dfe6084a56eb99f4 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r829919903



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -113,19 +127,45 @@ public void init() throws Exception {
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
         if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
-            }
-        } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce taking a full checkpoint."
-                            + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
+            return triggerCheckpointNowAsync(checkpointMetaData, checkpointOptions);
+        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize fields
+        mainMailboxExecutor.execute(
+                () ->
+                        triggerCheckpointOnExternallyInducedSource(
+                                checkpointMetaData, checkpointOptions, triggerFuture),
+                "SourceOperatorStreamTask#triggerCheckpointAsync");
+        return triggerFuture;
+    }
+
+    private void triggerCheckpointOnExternallyInducedSource(
+            CheckpointMetaData checkpointMetaData,
+            CheckpointOptions checkpointOptions,
+            CompletableFuture<Boolean> triggerFuture) {
+        // cleanup any old checkpoint that was cancelled before trigger
+        triggeredCheckpoints.headSet(checkpointMetaData.getCheckpointId()).clear();
+        if (!triggeredCheckpoints.remove(checkpointMetaData.getCheckpointId())) {
+            // common case: RPC is received before source reader triggers checkpoint
+            // store metadata and options for later
+            untriggeredCheckpoints.put(
+                    checkpointMetaData.getCheckpointId(),
+                    new UntriggeredCheckpoint(checkpointMetaData, checkpointOptions));
+            triggerFuture.complete(isRunning());
+        } else {
+            // not externally induced or trigger already received (rare case)

Review comment:
       I guess, the comment is wrong now? It is only `trigger already received (rare case)`, right?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -109,23 +125,55 @@ public void init() throws Exception {
                         this::getAsyncCheckpointStartDelayNanos);
     }
 
+    @Override
+    protected void processInput(Controller controller) throws Exception {
+        if (!isExternallyInducedSource || triggeredCheckpoints.isEmpty()) {

Review comment:
       Doesn't it cause hot looping? It also adds an additional condition on the hot path, doesn't it?
   
   Would it be possible to `suspendDefaultAction` when we add something to the `triggeredCheckpoints` and unfreeze it once the `triggeredCheckpoints` are empty?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -159,22 +207,54 @@ protected void advanceToEndOfEventTime() {
         output.emitWatermark(Watermark.MAX_WATERMARK);
     }
 
+    @Override
+    protected void declineCheckpoint(long checkpointId) {
+        cleanupCheckpoint(checkpointId);
+        super.declineCheckpoint(checkpointId);
+    }
+
+    @Override
+    public Future<Void> notifyCheckpointAbortAsync(
+            long checkpointId, long latestCompletedCheckpointId) {
+        cleanupCheckpoint(checkpointId);
+        return super.notifyCheckpointAbortAsync(checkpointId, latestCompletedCheckpointId);
+    }
+
+    @Override
+    public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId) {
+        cleanupCheckpoint(checkpointId);
+        return super.notifyCheckpointSubsumedAsync(checkpointId);
+    }
+
     // --------------------------
 
     private void triggerCheckpointForExternallyInducedSource(long checkpointId) {
-        final CheckpointOptions checkpointOptions =
-                CheckpointOptions.forConfig(
-                        CheckpointType.CHECKPOINT,
-                        CheckpointStorageLocationReference.getDefault(),
-                        configuration.isExactlyOnceCheckpointMode(),
-                        configuration.isUnalignedCheckpointsEnabled(),
-                        configuration.getAlignedCheckpointTimeout().toMillis());
-        final long timestamp = System.currentTimeMillis();
+        UntriggeredCheckpoint untriggeredCheckpoint = untriggeredCheckpoints.remove(checkpointId);
+        if (untriggeredCheckpoint != null) {
+            cleanupOldCheckpoints(checkpointId);
+            // common case: RPC before external sources induces it
+            triggerCheckpointNowAsync(
+                    untriggeredCheckpoint.getMetadata(),
+                    untriggeredCheckpoint.getCheckpointOptions());
+        } else {
+            // rare case: external source induced first
+            triggeredCheckpoints.add(checkpointId);
+        }
+    }
 
-        final CheckpointMetaData checkpointMetaData =
-                new CheckpointMetaData(checkpointId, timestamp, timestamp);
+    /**
+     * Cleanup any orphaned checkpoint before the given currently triggered checkpoint. These
+     * checkpoint may occur when the checkpoint is cancelled but the RPC is lost.
+     */
+    private void cleanupOldCheckpoints(long checkpointId) {
+        triggeredCheckpoints.headSet(checkpointId).clear();
+        untriggeredCheckpoints.headMap(checkpointId).clear();
+    }
 
-        super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
+    /** Remove temporary data about a canceled checkpoint. */
+    private void cleanupCheckpoint(long checkpointId) {

Review comment:
       Shouldn't we do it in the mailbox? We are modifying fields that are accessed from RPC and the mailbox.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372",
       "triggerID" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33480",
       "triggerID" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33524",
       "triggerID" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 94f2b52265123f5062c31b80d81dd0ef8fd24161 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33480) 
   * f993e8fec32cee34fac51d298b618bd4284a96ec Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33524) 
   * 224cf1d167603784767b915329fadbd1ea24af9a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bdda380493d6b1ebecf49ce9dfe6084a56eb99f4 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272) 
   * 99ed2d64ea1b2741d8baff44b991b1b2ad03de3c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] AHeise commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
AHeise commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r829559288



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
-            }
-        } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce taking a full checkpoint."
-                            + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize fields
+        mainMailboxExecutor.execute(

Review comment:
       Not sure about the performance implications here. In general, why don't we process all RPC directly in the mailbox?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bdda380493d6b1ebecf49ce9dfe6084a56eb99f4 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] AHeise commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
AHeise commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r830864828



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -159,22 +207,54 @@ protected void advanceToEndOfEventTime() {
         output.emitWatermark(Watermark.MAX_WATERMARK);
     }
 
+    @Override
+    protected void declineCheckpoint(long checkpointId) {
+        cleanupCheckpoint(checkpointId);
+        super.declineCheckpoint(checkpointId);
+    }
+
+    @Override
+    public Future<Void> notifyCheckpointAbortAsync(
+            long checkpointId, long latestCompletedCheckpointId) {
+        cleanupCheckpoint(checkpointId);
+        return super.notifyCheckpointAbortAsync(checkpointId, latestCompletedCheckpointId);
+    }
+
+    @Override
+    public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId) {
+        cleanupCheckpoint(checkpointId);
+        return super.notifyCheckpointSubsumedAsync(checkpointId);
+    }
+
     // --------------------------
 
     private void triggerCheckpointForExternallyInducedSource(long checkpointId) {
-        final CheckpointOptions checkpointOptions =
-                CheckpointOptions.forConfig(
-                        CheckpointType.CHECKPOINT,
-                        CheckpointStorageLocationReference.getDefault(),
-                        configuration.isExactlyOnceCheckpointMode(),
-                        configuration.isUnalignedCheckpointsEnabled(),
-                        configuration.getAlignedCheckpointTimeout().toMillis());
-        final long timestamp = System.currentTimeMillis();
+        UntriggeredCheckpoint untriggeredCheckpoint = untriggeredCheckpoints.remove(checkpointId);
+        if (untriggeredCheckpoint != null) {
+            cleanupOldCheckpoints(checkpointId);
+            // common case: RPC before external sources induces it
+            triggerCheckpointNowAsync(
+                    untriggeredCheckpoint.getMetadata(),
+                    untriggeredCheckpoint.getCheckpointOptions());
+        } else {
+            // rare case: external source induced first
+            triggeredCheckpoints.add(checkpointId);
+        }
+    }
 
-        final CheckpointMetaData checkpointMetaData =
-                new CheckpointMetaData(checkpointId, timestamp, timestamp);
+    /**
+     * Cleanup any orphaned checkpoint before the given currently triggered checkpoint. These
+     * checkpoint may occur when the checkpoint is cancelled but the RPC is lost.
+     */
+    private void cleanupOldCheckpoints(long checkpointId) {
+        triggeredCheckpoints.headSet(checkpointId).clear();
+        untriggeredCheckpoints.headMap(checkpointId).clear();
+    }
 
-        super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
+    /** Remove temporary data about a canceled checkpoint. */
+    private void cleanupCheckpoint(long checkpointId) {

Review comment:
       Good catch. I keep forgetting that RPC are not in mailbox. :/
   I added assertions now and called them over mailbox in the async RPC calls. (`declineCheckpoint` is already in mailbox).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 628208d91133e647a2abb952656274e9ff95a808 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341) 
   * d3bff46f55d08691473eaea920456474b54ef20e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bdda380493d6b1ebecf49ce9dfe6084a56eb99f4 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r829855978



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
-            }
-        } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce taking a full checkpoint."
-                            + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize fields
+        mainMailboxExecutor.execute(

Review comment:
       But it still requires 2 mails for checkpoints without externally induced sources, doesn't it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372",
       "triggerID" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33480",
       "triggerID" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33524",
       "triggerID" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33539",
       "triggerID" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a399eaee91353d104be43a8b8f9bc40068274f1",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33598",
       "triggerID" : "7a399eaee91353d104be43a8b8f9bc40068274f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e263a620aa70714d4958dd4dc47ad1b1b3866a8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5e263a620aa70714d4958dd4dc47ad1b1b3866a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7a399eaee91353d104be43a8b8f9bc40068274f1 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33598) 
   * 5e263a620aa70714d4958dd4dc47ad1b1b3866a8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372",
       "triggerID" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d3bff46f55d08691473eaea920456474b54ef20e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372) 
   * 94f2b52265123f5062c31b80d81dd0ef8fd24161 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bdda380493d6b1ebecf49ce9dfe6084a56eb99f4 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272) 
   * 99ed2d64ea1b2741d8baff44b991b1b2ad03de3c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r829881790



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
-            }
-        } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce taking a full checkpoint."
-                            + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize fields
+        mainMailboxExecutor.execute(

Review comment:
       That looks actually very similar to the code I wrote ;) so I think I am good with it.
   
   I will continue checking the PR as I have not read through everything.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
dawidwys commented on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1077445474


   On the matter of writing into separate files instead of keeping data inside of the metadata, you might want to have a look at: `state.storage.fs.memory-threshold`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] AHeise merged pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
AHeise merged pull request #19138:
URL: https://github.com/apache/flink/pull/19138


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] AHeise commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
AHeise commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r829847948



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -162,19 +191,21 @@ protected void advanceToEndOfEventTime() {
     // --------------------------
 
     private void triggerCheckpointForExternallyInducedSource(long checkpointId) {
-        final CheckpointOptions checkpointOptions =
-                CheckpointOptions.forConfig(
-                        CheckpointType.CHECKPOINT,
-                        CheckpointStorageLocationReference.getDefault(),
-                        configuration.isExactlyOnceCheckpointMode(),
-                        configuration.isUnalignedCheckpointsEnabled(),
-                        configuration.getAlignedCheckpointTimeout().toMillis());
-        final long timestamp = System.currentTimeMillis();
-
-        final CheckpointMetaData checkpointMetaData =
-                new CheckpointMetaData(checkpointId, timestamp, timestamp);
-
-        super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
+        // cleanup any old checkpoint that was cancelled before trigger
+        untriggeredCheckpoints.headMap(checkpointId).clear();
+        UntriggeredCheckpoint untriggeredCheckpoint = untriggeredCheckpoints.remove(checkpointId);
+        if (untriggeredCheckpoint != null) {
+            // common case: RPC before external sources induces it
+            super.triggerCheckpointAsync(
+                    untriggeredCheckpoint.getMetadata(),
+                    untriggeredCheckpoint.getCheckpointOptions());
+        } else {
+            // rare case: external source induced first
+            // note that at this point, we should probably not emit more data such that data is
+            // properly aligned
+            // however, unless we receive a reliable checkpoint abort RPC, this may deadlock

Review comment:
       We should probably discuss if this is the best choice.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] AHeise commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
AHeise commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r830865332



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -109,23 +125,55 @@ public void init() throws Exception {
                         this::getAsyncCheckpointStartDelayNanos);
     }
 
+    @Override
+    protected void processInput(Controller controller) throws Exception {
+        if (!isExternallyInducedSource || triggeredCheckpoints.isEmpty()) {

Review comment:
       I beefed `StreamTaskExternallyInducedSourceInput` a bit to not impact the hot-path of normal sources. PTAL.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] AHeise commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
AHeise commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r829559288



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
-            }
-        } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce taking a full checkpoint."
-                            + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize fields
+        mainMailboxExecutor.execute(

Review comment:
       Not sure about the performance implications here. In general, why don't we process all RPC directly in the mailbox?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
-            }
-        } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce taking a full checkpoint."
-                            + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize fields
+        mainMailboxExecutor.execute(

Review comment:
       I quickly reworked it, such that it only needs 2 mails for the rare case of externally induced sources. WDYT?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -162,19 +191,21 @@ protected void advanceToEndOfEventTime() {
     // --------------------------
 
     private void triggerCheckpointForExternallyInducedSource(long checkpointId) {
-        final CheckpointOptions checkpointOptions =
-                CheckpointOptions.forConfig(
-                        CheckpointType.CHECKPOINT,
-                        CheckpointStorageLocationReference.getDefault(),
-                        configuration.isExactlyOnceCheckpointMode(),
-                        configuration.isUnalignedCheckpointsEnabled(),
-                        configuration.getAlignedCheckpointTimeout().toMillis());
-        final long timestamp = System.currentTimeMillis();
-
-        final CheckpointMetaData checkpointMetaData =
-                new CheckpointMetaData(checkpointId, timestamp, timestamp);
-
-        super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
+        // cleanup any old checkpoint that was cancelled before trigger
+        untriggeredCheckpoints.headMap(checkpointId).clear();
+        UntriggeredCheckpoint untriggeredCheckpoint = untriggeredCheckpoints.remove(checkpointId);
+        if (untriggeredCheckpoint != null) {
+            // common case: RPC before external sources induces it
+            super.triggerCheckpointAsync(
+                    untriggeredCheckpoint.getMetadata(),
+                    untriggeredCheckpoint.getCheckpointOptions());
+        } else {
+            // rare case: external source induced first
+            // note that at this point, we should probably not emit more data such that data is
+            // properly aligned
+            // however, unless we receive a reliable checkpoint abort RPC, this may deadlock

Review comment:
       We should probably discuss if this is the best choice.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
-            }
-        } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce taking a full checkpoint."
-                            + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize fields
+        mainMailboxExecutor.execute(
+                () -> {
+                    // cleanup any old checkpoint that was cancelled before trigger
+                    triggeredCheckpoints.headSet(checkpointMetaData.getCheckpointId()).clear();

Review comment:
       The cleanup (and the one in #trigger) don't work well with concurrent checkpoints. Do we have a way to determine max concurrent checkpoints or can we actually rely on `abortCheckpoint`?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
-            }
-        } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce taking a full checkpoint."
-                            + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize fields
+        mainMailboxExecutor.execute(

Review comment:
       my bad, I didn't force push -.-

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
-            }
-        } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce taking a full checkpoint."
-                            + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize fields
+        mainMailboxExecutor.execute(

Review comment:
       Yes, it's heavily inspired by your idea but I didn't get the idea behind `TriggerAction`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372",
       "triggerID" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33480",
       "triggerID" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33524",
       "triggerID" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33539",
       "triggerID" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a399eaee91353d104be43a8b8f9bc40068274f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33598",
       "triggerID" : "7a399eaee91353d104be43a8b8f9bc40068274f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e263a620aa70714d4958dd4dc47ad1b1b3866a8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33655",
       "triggerID" : "5e263a620aa70714d4958dd4dc47ad1b1b3866a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e263a620aa70714d4958dd4dc47ad1b1b3866a8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33655) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372",
       "triggerID" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33480",
       "triggerID" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33524",
       "triggerID" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33539",
       "triggerID" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 224cf1d167603784767b915329fadbd1ea24af9a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33539) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372",
       "triggerID" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33480",
       "triggerID" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d3bff46f55d08691473eaea920456474b54ef20e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372) 
   * 94f2b52265123f5062c31b80d81dd0ef8fd24161 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33480) 
   * f993e8fec32cee34fac51d298b618bd4284a96ec UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372",
       "triggerID" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33480",
       "triggerID" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33524",
       "triggerID" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33539",
       "triggerID" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a399eaee91353d104be43a8b8f9bc40068274f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33598",
       "triggerID" : "7a399eaee91353d104be43a8b8f9bc40068274f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e263a620aa70714d4958dd4dc47ad1b1b3866a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33655",
       "triggerID" : "5e263a620aa70714d4958dd4dc47ad1b1b3866a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a64229d137a6c0f81f5bf758506ea21bd075c8da",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33885",
       "triggerID" : "a64229d137a6c0f81f5bf758506ea21bd075c8da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a64229d137a6c0f81f5bf758506ea21bd075c8da Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33885) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 99ed2d64ea1b2741d8baff44b991b1b2ad03de3c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274) 
   * 61c748931fa19d8c1fefbee20e276d95b1687084 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] AHeise commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
AHeise commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r829849338



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
-            }
-        } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce taking a full checkpoint."
-                            + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize fields
+        mainMailboxExecutor.execute(
+                () -> {
+                    // cleanup any old checkpoint that was cancelled before trigger
+                    triggeredCheckpoints.headSet(checkpointMetaData.getCheckpointId()).clear();

Review comment:
       The cleanup (and the one in #trigger) don't work well with concurrent checkpoints. Do we have a way to determine max concurrent checkpoints or can we actually rely on `abortCheckpoint`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] AHeise commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
AHeise commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r829870130



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
-            }
-        } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce taking a full checkpoint."
-                            + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize fields
+        mainMailboxExecutor.execute(

Review comment:
       my bad, I didn't force push -.-




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 61c748931fa19d8c1fefbee20e276d95b1687084 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322) 
   * 628208d91133e647a2abb952656274e9ff95a808 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 99ed2d64ea1b2741d8baff44b991b1b2ad03de3c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bdda380493d6b1ebecf49ce9dfe6084a56eb99f4 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272) 
   * 99ed2d64ea1b2741d8baff44b991b1b2ad03de3c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] AHeise commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
AHeise commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r831133207



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -159,22 +215,62 @@ protected void advanceToEndOfEventTime() {
         output.emitWatermark(Watermark.MAX_WATERMARK);
     }
 
+    @Override
+    protected void declineCheckpoint(long checkpointId) {
+        cleanupCheckpoint(checkpointId);
+        super.declineCheckpoint(checkpointId);
+    }
+
+    @Override
+    public Future<Void> notifyCheckpointAbortAsync(
+            long checkpointId, long latestCompletedCheckpointId) {
+        mainMailboxExecutor.execute(
+                () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);
+        return super.notifyCheckpointAbortAsync(checkpointId, latestCompletedCheckpointId);
+    }
+
+    @Override
+    public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId) {
+        mainMailboxExecutor.execute(
+                () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);
+        return super.notifyCheckpointSubsumedAsync(checkpointId);
+    }
+
     // --------------------------
 
     private void triggerCheckpointForExternallyInducedSource(long checkpointId) {
-        final CheckpointOptions checkpointOptions =
-                CheckpointOptions.forConfig(
-                        CheckpointType.CHECKPOINT,
-                        CheckpointStorageLocationReference.getDefault(),
-                        configuration.isExactlyOnceCheckpointMode(),
-                        configuration.isUnalignedCheckpointsEnabled(),
-                        configuration.getAlignedCheckpointTimeout().toMillis());
-        final long timestamp = System.currentTimeMillis();
+        UntriggeredCheckpoint untriggeredCheckpoint = untriggeredCheckpoints.remove(checkpointId);
+        if (untriggeredCheckpoint != null) {
+            cleanupOldCheckpoints(checkpointId);
+            // common case: RPC before external sources induces it
+            triggerCheckpointNowAsync(
+                    untriggeredCheckpoint.getMetadata(),
+                    untriggeredCheckpoint.getCheckpointOptions());
+        } else {
+            // rare case: external source induced first
+            triggeredCheckpoints.add(checkpointId);
+            if (waitForRPC.isDone()) {
+                waitForRPC = new CompletableFuture<>();
+                externallyInducedSourceInput.blockUntil(waitForRPC);
+            }
+        }
+    }
 
-        final CheckpointMetaData checkpointMetaData =
-                new CheckpointMetaData(checkpointId, timestamp, timestamp);
+    /**
+     * Cleanup any orphaned checkpoint before the given currently triggered checkpoint. These
+     * checkpoint may occur when the checkpoint is cancelled but the RPC is lost.
+     */
+    private void cleanupOldCheckpoints(long checkpointId) {
+        assert (mailboxProcessor.isMailboxThread());
+        triggeredCheckpoints.headSet(checkpointId).clear();
+        untriggeredCheckpoints.headMap(checkpointId).clear();
+    }
 
-        super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
+    /** Remove temporary data about a canceled checkpoint. */
+    private void cleanupCheckpoint(long checkpointId) {
+        assert (mailboxProcessor.isMailboxThread());

Review comment:
       You are absolutely right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bdda380493d6b1ebecf49ce9dfe6084a56eb99f4 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r829881790



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
-            }
-        } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce taking a full checkpoint."
-                            + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize fields
+        mainMailboxExecutor.execute(

Review comment:
       That looks actually very similar to the code I wrote ;) so I think I am good with it. It's actually better without the `TriggerAction`
   
   I will continue checking the PR as I have not read through everything.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] AHeise commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
AHeise commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r829937062



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
-            }
-        } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce taking a full checkpoint."
-                            + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize fields
+        mainMailboxExecutor.execute(

Review comment:
       Yes, it's heavily inspired by your idea but I didn't get the idea behind `TriggerAction`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r829830935



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
-            }
-        } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce taking a full checkpoint."
-                            + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize fields
+        mainMailboxExecutor.execute(

Review comment:
       We kind of do. The only check so far outside of the mailbox was a local flag check `isSynchronous(checkpointOptions.getCheckpointType())`. All the rest is immediately executed in the mailbox.
   
   As for the change, the way I see it now, is that triggering a checkpoint will require two mailbox actions instead of one.
   One for checking the flags (`isSynchronous`/`externallyInduced` or so) and the second one for executing the actual trigger. 
   
   I am a bit hesitant about such a change that affects all jobs for the sake of really rare ones that use `externallyInducedSources`.
   
   The alternatives I see are either 1) rework `triggerStopWithSavepointAsync`, `triggerCheckpointAsync` to not submit a second action, which IMO would not be trivial and poses some risks or 2) execute the logic for externally induced sources in the mailbox and act on its results. Sth like:
   
   ```
   enum TriggerAction {
   IMMEDIATE,
   DELAYED
   }
   
   public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
       if (isExternallyInducedSource) {
         return triggerExternallyInducedSourcesCheckpointAsync()
           .thenCompose(action -> {
                 switch (action) {
                        case IMMEDIATE:
                            return doTriggerCheckpointAsync();
                        case DELAYED:
                            return CompletableFuture.completedFuture(isRunning());
                 }
             })
       } else {
         return doTriggerCheckpointAsync(...);
       }
   }
   
   CompletableFuture<TriggerAction> triggerExternallyInducedSourcesCheckpointAsync() {
   mainMailboxExecutor.execute(...)
   return action;
   }
   
   CompletableFuture<Boolean> doTriggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
   ...
   }
   
   ```

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
-            }
-        } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce taking a full checkpoint."
-                            + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize fields
+        mainMailboxExecutor.execute(

Review comment:
       But it still requires 2 mails for checkpoints without externally induced sources, doesn't it?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
-            }
-        } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce taking a full checkpoint."
-                            + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize fields
+        mainMailboxExecutor.execute(

Review comment:
       That looks actually very similar to the code I wrote ;) so I think I am good with it.
   
   I will continue checking the PR as I have not read through everything.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
-            }
-        } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce taking a full checkpoint."
-                            + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize fields
+        mainMailboxExecutor.execute(

Review comment:
       That looks actually very similar to the code I wrote ;) so I think I am good with it. It's actually better without the `TriggerAction`
   
   I will continue checking the PR as I have not read through everything.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r830879183



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -159,22 +215,62 @@ protected void advanceToEndOfEventTime() {
         output.emitWatermark(Watermark.MAX_WATERMARK);
     }
 
+    @Override
+    protected void declineCheckpoint(long checkpointId) {
+        cleanupCheckpoint(checkpointId);
+        super.declineCheckpoint(checkpointId);
+    }
+
+    @Override
+    public Future<Void> notifyCheckpointAbortAsync(
+            long checkpointId, long latestCompletedCheckpointId) {
+        mainMailboxExecutor.execute(
+                () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);
+        return super.notifyCheckpointAbortAsync(checkpointId, latestCompletedCheckpointId);
+    }
+
+    @Override
+    public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId) {
+        mainMailboxExecutor.execute(
+                () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);
+        return super.notifyCheckpointSubsumedAsync(checkpointId);
+    }
+
     // --------------------------
 
     private void triggerCheckpointForExternallyInducedSource(long checkpointId) {
-        final CheckpointOptions checkpointOptions =
-                CheckpointOptions.forConfig(
-                        CheckpointType.CHECKPOINT,
-                        CheckpointStorageLocationReference.getDefault(),
-                        configuration.isExactlyOnceCheckpointMode(),
-                        configuration.isUnalignedCheckpointsEnabled(),
-                        configuration.getAlignedCheckpointTimeout().toMillis());
-        final long timestamp = System.currentTimeMillis();
+        UntriggeredCheckpoint untriggeredCheckpoint = untriggeredCheckpoints.remove(checkpointId);
+        if (untriggeredCheckpoint != null) {
+            cleanupOldCheckpoints(checkpointId);
+            // common case: RPC before external sources induces it
+            triggerCheckpointNowAsync(
+                    untriggeredCheckpoint.getMetadata(),
+                    untriggeredCheckpoint.getCheckpointOptions());
+        } else {
+            // rare case: external source induced first
+            triggeredCheckpoints.add(checkpointId);
+            if (waitForRPC.isDone()) {
+                waitForRPC = new CompletableFuture<>();
+                externallyInducedSourceInput.blockUntil(waitForRPC);
+            }
+        }
+    }
 
-        final CheckpointMetaData checkpointMetaData =
-                new CheckpointMetaData(checkpointId, timestamp, timestamp);
+    /**
+     * Cleanup any orphaned checkpoint before the given currently triggered checkpoint. These
+     * checkpoint may occur when the checkpoint is cancelled but the RPC is lost.
+     */
+    private void cleanupOldCheckpoints(long checkpointId) {
+        assert (mailboxProcessor.isMailboxThread());
+        triggeredCheckpoints.headSet(checkpointId).clear();
+        untriggeredCheckpoints.headMap(checkpointId).clear();
+    }
 
-        super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
+    /** Remove temporary data about a canceled checkpoint. */
+    private void cleanupCheckpoint(long checkpointId) {
+        assert (mailboxProcessor.isMailboxThread());

Review comment:
       Shouldn't we potentially unblock the input here? If the only pending checkpoint was `aborted/declined/cancelled`?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -125,17 +130,10 @@ public void init() throws Exception {
                         this::getAsyncCheckpointStartDelayNanos);
     }
 
-    @Override
-    protected void processInput(Controller controller) throws Exception {
-        if (!isExternallyInducedSource || triggeredCheckpoints.isEmpty()) {
-            super.processInput(controller);
-        }
-    }
-
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
-        if (!isExternallyInducedSource) {
+        if (!isExternallInducedSource()) {

Review comment:
       typo: `isExternallyInducedSource`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372",
       "triggerID" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33480",
       "triggerID" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33524",
       "triggerID" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33539",
       "triggerID" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f993e8fec32cee34fac51d298b618bd4284a96ec Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33524) 
   * 224cf1d167603784767b915329fadbd1ea24af9a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33539) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] AHeise commented on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
AHeise commented on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1073943008


   > How hard would it be to add a test for the blocking/unblocking of the externally induced source?
   
   I have added assertions into the main test method that cover that. Please check if you think I should have additional test cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372",
       "triggerID" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33480",
       "triggerID" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33524",
       "triggerID" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33539",
       "triggerID" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a399eaee91353d104be43a8b8f9bc40068274f1",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33598",
       "triggerID" : "7a399eaee91353d104be43a8b8f9bc40068274f1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 224cf1d167603784767b915329fadbd1ea24af9a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33539) 
   * 7a399eaee91353d104be43a8b8f9bc40068274f1 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33598) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372",
       "triggerID" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33480",
       "triggerID" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33524",
       "triggerID" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33539",
       "triggerID" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a399eaee91353d104be43a8b8f9bc40068274f1",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33598",
       "triggerID" : "7a399eaee91353d104be43a8b8f9bc40068274f1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7a399eaee91353d104be43a8b8f9bc40068274f1 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33598) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] crazyzhou commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
crazyzhou commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r833037741



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -159,22 +215,76 @@ protected void advanceToEndOfEventTime() {
         output.emitWatermark(Watermark.MAX_WATERMARK);
     }
 
+    @Override
+    protected void declineCheckpoint(long checkpointId) {
+        cleanupCheckpoint(checkpointId);
+        super.declineCheckpoint(checkpointId);
+    }
+
+    @Override
+    public Future<Void> notifyCheckpointAbortAsync(
+            long checkpointId, long latestCompletedCheckpointId) {
+        mainMailboxExecutor.execute(
+                () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);
+        return super.notifyCheckpointAbortAsync(checkpointId, latestCompletedCheckpointId);
+    }
+
+    @Override
+    public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId) {
+        mainMailboxExecutor.execute(
+                () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);
+        return super.notifyCheckpointSubsumedAsync(checkpointId);
+    }
+
     // --------------------------
 
     private void triggerCheckpointForExternallyInducedSource(long checkpointId) {
-        final CheckpointOptions checkpointOptions =
-                CheckpointOptions.forConfig(
-                        CheckpointType.CHECKPOINT,
-                        CheckpointStorageLocationReference.getDefault(),
-                        configuration.isExactlyOnceCheckpointMode(),
-                        configuration.isUnalignedCheckpointsEnabled(),
-                        configuration.getAlignedCheckpointTimeout().toMillis());
-        final long timestamp = System.currentTimeMillis();
+        UntriggeredCheckpoint untriggeredCheckpoint = untriggeredCheckpoints.remove(checkpointId);
+        if (untriggeredCheckpoint != null) {
+            cleanupOldCheckpoints(checkpointId);
+            // common case: RPC before external sources induces it
+            triggerCheckpointNowAsync(
+                    untriggeredCheckpoint.getMetadata(),
+                    untriggeredCheckpoint.getCheckpointOptions());
+        } else {
+            // rare case: external source induced first
+            triggeredCheckpoints.add(checkpointId);
+            if (waitForRPC.isDone()) {
+                waitForRPC = new CompletableFuture<>();
+                externallyInducedSourceInput.blockUntil(waitForRPC);
+            }
+        }
+    }
+
+    /**
+     * Cleanup any orphaned checkpoint before the given currently triggered checkpoint. These
+     * checkpoint may occur when the checkpoint is cancelled but the RPC is lost. Note, to be safe,
+     * checkpoint X is only removed when both RPC and trigger for a checkpoint Y>X is received.
+     */
+    private void cleanupOldCheckpoints(long checkpointId) {
+        assert (mailboxProcessor.isMailboxThread());
+        triggeredCheckpoints.headSet(checkpointId).clear();
+        untriggeredCheckpoints.headMap(checkpointId).clear();
+
+        maybeResumeProcessing();

Review comment:
       Do we need to call this here as we have it already called at last?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372",
       "triggerID" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33480",
       "triggerID" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33524",
       "triggerID" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33539",
       "triggerID" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a399eaee91353d104be43a8b8f9bc40068274f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33598",
       "triggerID" : "7a399eaee91353d104be43a8b8f9bc40068274f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e263a620aa70714d4958dd4dc47ad1b1b3866a8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33655",
       "triggerID" : "5e263a620aa70714d4958dd4dc47ad1b1b3866a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a64229d137a6c0f81f5bf758506ea21bd075c8da",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a64229d137a6c0f81f5bf758506ea21bd075c8da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e263a620aa70714d4958dd4dc47ad1b1b3866a8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33655) 
   * a64229d137a6c0f81f5bf758506ea21bd075c8da UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bdda380493d6b1ebecf49ce9dfe6084a56eb99f4 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272) 
   * 99ed2d64ea1b2741d8baff44b991b1b2ad03de3c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bdda380493d6b1ebecf49ce9dfe6084a56eb99f4 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r829830935



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
-            }
-        } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce taking a full checkpoint."
-                            + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize fields
+        mainMailboxExecutor.execute(

Review comment:
       We kind of do. The only check so far outside of the mailbox was a local flag check `isSynchronous(checkpointOptions.getCheckpointType())`. All the rest is immediately executed in the mailbox.
   
   As for the change, the way I see it now, is that triggering a checkpoint will require two mailbox actions instead of one.
   One for checking the flags (`isSynchronous`/`externallyInduced` or so) and the second one for executing the actual trigger. 
   
   I am a bit hesitant about such a change that affects all jobs for the sake of really rare ones that use `externallyInducedSources`.
   
   The alternatives I see are either 1) rework `triggerStopWithSavepointAsync`, `triggerCheckpointAsync` to not submit a second action, which IMO would not be trivial and poses some risks or 2) execute the logic for externally induced sources in the mailbox and act on its results. Sth like:
   
   ```
   enum TriggerAction {
   IMMEDIATE,
   DELAYED
   }
   
   public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
       if (isExternallyInducedSource) {
         return triggerExternallyInducedSourcesCheckpointAsync()
           .thenCompose(action -> {
                 switch (action) {
                        case IMMEDIATE:
                            return doTriggerCheckpointAsync();
                        case DELAYED:
                            return CompletableFuture.completedFuture(isRunning());
                 }
             })
       } else {
         return doTriggerCheckpointAsync(...);
       }
   }
   
   CompletableFuture<TriggerAction> triggerExternallyInducedSourcesCheckpointAsync() {
   mainMailboxExecutor.execute(...)
   return action;
   }
   
   CompletableFuture<Boolean> doTriggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
   ...
   }
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 99ed2d64ea1b2741d8baff44b991b1b2ad03de3c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274) 
   * 61c748931fa19d8c1fefbee20e276d95b1687084 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322) 
   * 628208d91133e647a2abb952656274e9ff95a808 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] AHeise commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
AHeise commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r829847489



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
-            }
-        } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce taking a full checkpoint."
-                            + "If you are restoring from a snapshot in NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize fields
+        mainMailboxExecutor.execute(

Review comment:
       I quickly reworked it, such that it only needs 2 mails for the rare case of externally induced sources. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1071624475


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33272",
       "triggerID" : "bdda380493d6b1ebecf49ce9dfe6084a56eb99f4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33274",
       "triggerID" : "99ed2d64ea1b2741d8baff44b991b1b2ad03de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33322",
       "triggerID" : "61c748931fa19d8c1fefbee20e276d95b1687084",
       "triggerType" : "PUSH"
     }, {
       "hash" : "628208d91133e647a2abb952656274e9ff95a808",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33341",
       "triggerID" : "628208d91133e647a2abb952656274e9ff95a808",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33372",
       "triggerID" : "d3bff46f55d08691473eaea920456474b54ef20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33480",
       "triggerID" : "94f2b52265123f5062c31b80d81dd0ef8fd24161",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33524",
       "triggerID" : "f993e8fec32cee34fac51d298b618bd4284a96ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33539",
       "triggerID" : "224cf1d167603784767b915329fadbd1ea24af9a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a399eaee91353d104be43a8b8f9bc40068274f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7a399eaee91353d104be43a8b8f9bc40068274f1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 224cf1d167603784767b915329fadbd1ea24af9a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33539) 
   * 7a399eaee91353d104be43a8b8f9bc40068274f1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] AHeise commented on pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
AHeise commented on pull request #19138:
URL: https://github.com/apache/flink/pull/19138#issuecomment-1076361857


   This PR has been verified by the flink-pravega maintainers to work on their tests for checkpoints (savepoint test pending).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] AHeise commented on a change in pull request #19138: [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air.

Posted by GitBox <gi...@apache.org>.
AHeise commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r830838985



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -109,23 +125,55 @@ public void init() throws Exception {
                         this::getAsyncCheckpointStartDelayNanos);
     }
 
+    @Override
+    protected void processInput(Controller controller) throws Exception {
+        if (!isExternallyInducedSource || triggeredCheckpoints.isEmpty()) {

Review comment:
       Yes, I wanted to avoid it for complexity but I guess, we need to do it. That probably means tinkering with the avail future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org