You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/31 04:34:25 UTC

[GitHub] [flink] gaoyunhaii opened a new pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

gaoyunhaii opened a new pull request #14820:
URL: https://github.com/apache/flink/pull/14820


   ## What is the purpose of the change
   
   This PR enables triggering checkpoint barrier handle for non-source tasks. 
   
   To support checkpoints after some tasks finished, non-source tasks might also receive RPC trigger of checkpoint after all their precedent tasks get finished. In this case, it need to trigger checkpoint barrier handler to insert barriers. This PR provides the functionality to notify checkpoint barrier on RPC trigger
   
   ## Brief change log
   
   *(for example:)*
     - a3daa2b88a7c1609daf2bb55742971eabe5dacf0 refactors the current implementation to expose CheckpointBarrierHandler.
     - b4baae6e24556aec841fb1c4fcedfec234e3c0cd notifies the non-source stream tasks' checkpoint barrier handler on RPC trigger.
     - 326d8751ac2aac4255b5e94da8e8f46a424c55c5 fixes the tests that using non-source task that directly perform checkpoint on trigger.
   
   
   ## Verifying this change
   
   Added UT to verify that checkpoint barrier handler get triggered on RPC checkpoint trigger.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **no**
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **no**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **no**
     - If yes, how is the feature documented? **not applicable**
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
dawidwys commented on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-859350238


   I thought about it quite a lot yesterday. Sorry for going back and forth with the issue. However, I want to be extra cautious with the checkpointing. I checked the FLIP again and also discussed with Piotr what would be the best general approach for handling the triggering in those corner cases.
   
   Let me, recollect what we said in the FLIP. IIRC we will introduce an additional `EndOfData` event which will be hand-shaken between the upstream and downstream task before the upstream task goes away, right? This means it is not possible to get an RPC `triggerCheckpoint` on a downstream task if there is any data in any of the channels. 
   
   Still, there might be barriers incoming through channels that already reported the `EndOfData` as the upstream tasks might've not gone away. In the discussion with Piotr, we said it would be better to minimise the number of declined/delayed checkpoints. Having said that I think your original proposal could come in handy here. 
   
   I'd still do the logic within the `StreamTask`, but I think it's a good idea to make the `CheckpointBarrierHandler` available here. Then we could handle the RPC in the following manner:
   
   ```
           List<?> notFinishedInputs = getNotFinishedInputs() // just a pseud code, we could probably do it cleaner
           if (notFinishedInputs.size() == 0) { // channels already closed, we won't see any data nor barriers
               return triggerCheckpointInRootNode(checkpointMetaData, checkpointOptions);
           } else { // we have seen the EndOfData, but there might be some barriers arriving
   
               // theoretically we could trigger/processBarrier for just a single channel, as all channels should've seen the EndOfData and they should not participate in the alignment, we care only about the triggering, however for the sake of completeness I'd do it for all the not finished channels
   
               for (InputGate inputGate : notFinishedInputs) {
                   for (InputChannelInfo channelInfo : inputGate.getNotFinishedChannels()) {
                        getCheckpointBarrierHandler().processBarrier(barrier, channelInfo);
                   }
               }
           }
   ```
   
   Therefore it's very much like your original proposal, just that we do not introduce the additional Source/Non-source split and we do not need to expose `isRunning` or `performCheckpoint`.
   
   WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint for non-source tasks

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #14820:
URL: https://github.com/apache/flink/pull/14820#discussion_r654200272



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -175,10 +176,23 @@ protected void createInputProcessor(
                         getEnvironment().getTaskInfo());
     }
 
+    protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler() {
+        return Optional.ofNullable(checkpointBarrierHandler);
+    }
+
     @Override
     public Future<Boolean> triggerCheckpointAsync(
             CheckpointMetaData metadata, CheckpointOptions options) {
 
+        if (operatorChain.getSourceTaskInputs().size() == 0) {
+            return super.triggerCheckpointAsync(metadata, options);
+        }
+
+        // If there are chained sources, we would always only trigger
+        // the chained sources for checkpoint. This means that for
+        // the checkpoints during the upstream task finished and
+        // this task receives the EndOfPartitionEvent, the checkpoint
+        // would not subsume the pending ones.

Review comment:
       Is that comment correct? We will still subsume the pending checkpoints from within the sources. We do process barriers for the source inputs in that case.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -77,6 +77,12 @@
 
     private static final int CHECKPOINT_EXECUTION_DELAY_LOG_THRESHOLD_MS = 30_000;
 
+    /**
+     * TODO Whether enables checkpoints after tasks finished. This is a temporary flag and will be
+     * removed in the last PR.
+     */
+    protected boolean enableCheckpointAfterTasksFinished;

Review comment:
       Why is this `protected`?

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1723,6 +1725,86 @@ protected void cancelTask() {
         assertTrue(OpenFailingOperator.wasClosed);
     }
 
+    @Test
+    public void testTriggeringCheckpointWithFinishedChannels() throws Exception {
+        OneInputStreamTaskTestHarness<String, String> testHarness =

Review comment:
       Could you rewrite the test to the newer `StreamTaskMailboxTestHarness`. I believe it should be fairly straightforward.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1723,6 +1725,86 @@ protected void cancelTask() {
         assertTrue(OpenFailingOperator.wasClosed);
     }
 
+    @Test
+    public void testTriggeringCheckpointWithFinishedChannels() throws Exception {
+        OneInputStreamTaskTestHarness<String, String> testHarness =

Review comment:
       Could you rewrite the test to the newer `StreamTaskMailboxTestHarness`. I believe it should be fairly straightforward. Especially as you've used it for the multi-input tasks already.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770325573


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 045971a4900dc6798be00108c8e639e1c77fa18c (Fri May 28 08:58:18 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-21085).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12909",
       "triggerID" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12924",
       "triggerID" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7f98711c3de38815d24f2d35a5be5017dadad27c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12909) 
   * d83bee7799051ef32e0f9ec9e26339b23b7dc057 UNKNOWN
   * 42cdc662ef4b6416b30017a5c3cb090ee66f7d07 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12924) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
dawidwys commented on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-854630242


   Honestly, I am not convinced about the changes. I don't like exposing the `CheckpointBarrierHandler`. Moreover the `triggerCheckpoint` method does not fit well into the class imo.
   
   You already have two separate methods in the `StreamTask` for triggering a checkpoint either from RPC or on a barrier. I don't necessarily understand why does it have to do so many rounds through the `StreamTask#triggerCheckpointAsync` -> `CheckpointBarrierHandler#triggerCheckpoint` -> `StreamTask#triggerCheckpointOnBarrier` -> `StreamTask#performCheckpoint`. Can't you just remove the two middle man? I think a check if all inputs are finished in the `StreamTask#triggerCheckpointAsync` of non-source tasks should be enough.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
dawidwys commented on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-857734904


   All right, I will need to think about it a bit more, but even assuming we can receive an RPC signal while the input channels are still active/non-finished I feel we can make it easier. I don't think there is a big enough difference between source and non-source tasks to justify extracting a separate hierarchy. The way I see it is that the only difference is if all the input gates has finished or not. (If there are no input gates all of them are obviously finished ;) ). In that scenario we could extend the `triggerCheckpointAsyncInMailbox` (it was renamed on master) to handle both cases quite easily.
   
   I think we don't need to expose the `SingleCheckpointBarrierHandler` in any way. Either with my approach or yours you would need to inject barriers on the RPC call into the channels/or drain the channels somehow, right? Otherwise you cannot checkpoint the data/align with the data in the channels properly. Or am I missing something? Therefore you could inject the barrier into channel/drain the channel from the `StreamTask` level and the logic in `SingleCheckpointBarrierHandler` would still be valid.
   
   Let me know what you think.
   
   ```
       @Override
       public Future<Boolean> triggerCheckpointAsync(
               CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
   
           CompletableFuture<Boolean> result = new CompletableFuture<>();
           mainMailboxExecutor.execute(
                   () -> {
                       try {
                           result.complete(
                                   triggerCheckpointAsyncInMailbox(
                                           checkpointMetaData, checkpointOptions));
                       } catch (Exception ex) {
                           // Report the failure both via the Future result but also to the mailbox
                           result.completeExceptionally(ex);
                           throw ex;
                       }
                   },
                   "checkpoint %s with %s",
                   checkpointMetaData,
                   checkpointOptions);
           return result;
       }
   
       private boolean triggerCheckpointAsyncInMailbox(
               CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)
               throws Exception {
           FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
           try {
               if (allInputsFinished()) {
                   return triggerCheckpointInRootNode(checkpointMetaData, checkpointOptions);
               } else {
                   throw new UnsupportedOperationException(
                           "We do not support triggering non root nodes yet.");
               }
           } catch (Exception e) {
               // propagate exceptions only if the task is still in "running" state
               if (isRunning) {
                   throw new Exception(
                           "Could not perform checkpoint "
                                   + checkpointMetaData.getCheckpointId()
                                   + " for operator "
                                   + getName()
                                   + '.',
                           e);
               } else {
                   LOG.debug(
                           "Could not perform checkpoint {} for operator {} while the "
                                   + "invokable was not in state running.",
                           checkpointMetaData.getCheckpointId(),
                           getName(),
                           e);
                   return false;
               }
           } finally {
               FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
           }
       }
   
       private boolean allInputsFinished() {
           return getEnvironment().getAllInputGates().length == 0
                   || Arrays.stream(getEnvironment().getAllInputGates())
                           .allMatch(InputGate::isFinished);
       }
   
       private boolean triggerCheckpointInRootNode(
               CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)
               throws Exception {
           latestAsyncCheckpointStartDelayNanos =
                   1_000_000
                           * Math.max(
                                   0, System.currentTimeMillis() - checkpointMetaData.getTimestamp());
   
           // No alignment if we inject a checkpoint
           CheckpointMetricsBuilder checkpointMetrics =
                   new CheckpointMetricsBuilder()
                           .setAlignmentDurationNanos(0L)
                           .setBytesProcessedDuringAlignment(0L)
                           .setCheckpointStartDelayNanos(latestAsyncCheckpointStartDelayNanos);
   
           subtaskCheckpointCoordinator.initInputsCheckpoint(
                   checkpointMetaData.getCheckpointId(), checkpointOptions);
   
           boolean success =
                   performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
           if (!success) {
               declineCheckpoint(checkpointMetaData.getCheckpointId());
           }
           return success;
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12909",
       "triggerID" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12924",
       "triggerID" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d83bee7799051ef32e0f9ec9e26339b23b7dc057 UNKNOWN
   * 42cdc662ef4b6416b30017a5c3cb090ee66f7d07 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12924) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14820:
URL: https://github.com/apache/flink/pull/14820#discussion_r570009052



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointableOneInputStreamTask.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+
+/** A test stream task that also response to the checkpoint trigger requirement. */
+public class CheckpointableOneInputStreamTask<IN, OUT> extends OneInputStreamTask<IN, OUT> {

Review comment:
       This is due to the logic for this test OneInputStream is different from the formal one in that it would trigger a snapshot (like source stream task) instead of go through the CheckpointBarrierHandler. They are mainly used in some test cases related to taking checkpoint on TM side.
   
   After some rethink I think for these test they would better to directly call `triggerCheckpointOnBarrier` to simulate the case that the alignment has finished. Thus I removed `CheckpointableOneInputStreamTask` and modifies the related tests to call `triggerCheckpointOnBarrier` directly.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -877,12 +875,6 @@ public StreamStatusMaintainer getStreamStatusMaintainer() {
         CompletableFuture<Boolean> result = new CompletableFuture<>();
         mainMailboxExecutor.execute(
                 () -> {
-                    latestAsyncCheckpointStartDelayNanos =

Review comment:
       This variable is only used for the true source tasks which do not attached with a `CheckpointBarrierHandler`, thus initially I think it could be moved to the base class for the source tasks would make it more clear. 
   
   The variable mainly measures the delay between JM triggering checkpoint and (source) tasks start to processing this checkpoint (namely `checkpointStartDelayNanos`), since move it to be inside `triggerCheckpoint` would only differs in the time of one method call, thus I think it would not cause too large difference, and we could rename the variable to `latestACheckpointStartDelayNanos`. Do you think this option would also be ok?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -901,54 +893,12 @@ public StreamStatusMaintainer getStreamStatusMaintainer() {
         return result;
     }
 
-    private boolean triggerCheckpoint(
+    protected boolean triggerCheckpoint(

Review comment:
       Currently not making it abstract is mainly because `MultipleInputStreamTask`  has override `triggerCheckpointAsync`, if we make ` triggerCheckpoint` abstract, then it has implemented an empty method which it does not use. If we want to also make `MultipleInputStreamTask` do not need to override `triggerCheckpointAsync` directly, we have to make the completable future a parameter to `triggerCheckpoint` and let the triggerCheckpoint to complete the future instead of hide it from `triggerCheckpoint`. 

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -901,54 +893,12 @@ public StreamStatusMaintainer getStreamStatusMaintainer() {
         return result;
     }
 
-    private boolean triggerCheckpoint(
+    protected boolean triggerCheckpoint(

Review comment:
       I also commit a diff to show the changes if we make `triggerCheckpoint` to be abstract: https://github.com/apache/flink/pull/14820/commits/83de6dda8fab862126c1e91010141828b93fe466

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -901,54 +893,12 @@ public StreamStatusMaintainer getStreamStatusMaintainer() {
         return result;
     }
 
-    private boolean triggerCheckpoint(
+    protected boolean triggerCheckpoint(

Review comment:
       I also commit a diff to show the changes if we make `triggerCheckpoint` to be abstract: https://github.com/apache/flink/pull/14820/commits/42cdc662ef4b6416b30017a5c3cb090ee66f7d07




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5043cb61111d95f503534b53bac93c451ae367e9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706) 
   * 438f3d37438cd7d5128d4e92a128bbcf83952361 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 48599d7654cbb03fb31bd28f9fc795a103870d81 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862) 
   * 7f98711c3de38815d24f2d35a5be5017dadad27c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 438f3d37438cd7d5128d4e92a128bbcf83952361 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846) 
   * 48599d7654cbb03fb31bd28f9fc795a103870d81 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14820:
URL: https://github.com/apache/flink/pull/14820#discussion_r570018055



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -877,12 +875,6 @@ public StreamStatusMaintainer getStreamStatusMaintainer() {
         CompletableFuture<Boolean> result = new CompletableFuture<>();
         mainMailboxExecutor.execute(
                 () -> {
-                    latestAsyncCheckpointStartDelayNanos =

Review comment:
       This variable is only used for the true source tasks which do not attached with a `CheckpointBarrierHandler`, thus initially I think it could be moved to the base class for the source tasks would make it more clear. 
   
   The variable mainly measures the delay between JM triggering checkpoint and (source) tasks start to processing this checkpoint (namely `checkpointStartDelayNanos`), since move it to be inside `triggerCheckpoint` would only differs in the time of one method call, thus I think it would not cause too large difference, and we could rename the variable to `latestACheckpointStartDelayNanos`. Do you think this option would also be ok?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12909",
       "triggerID" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12924",
       "triggerID" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "triggerType" : "PUSH"
     }, {
       "hash" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13090",
       "triggerID" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "045971a4900dc6798be00108c8e639e1c77fa18c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13940",
       "triggerID" : "045971a4900dc6798be00108c8e639e1c77fa18c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "05c48a82487c41963697e99a31baeab9e3c82240",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18478",
       "triggerID" : "05c48a82487c41963697e99a31baeab9e3c82240",
       "triggerType" : "PUSH"
     }, {
       "hash" : "061dc5ee35fdfaf474382cf6c4e385c7d70258b3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18993",
       "triggerID" : "061dc5ee35fdfaf474382cf6c4e385c7d70258b3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d83bee7799051ef32e0f9ec9e26339b23b7dc057 UNKNOWN
   * 061dc5ee35fdfaf474382cf6c4e385c7d70258b3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18993) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12909",
       "triggerID" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12924",
       "triggerID" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "triggerType" : "PUSH"
     }, {
       "hash" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13090",
       "triggerID" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "045971a4900dc6798be00108c8e639e1c77fa18c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13940",
       "triggerID" : "045971a4900dc6798be00108c8e639e1c77fa18c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "05c48a82487c41963697e99a31baeab9e3c82240",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18478",
       "triggerID" : "05c48a82487c41963697e99a31baeab9e3c82240",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d83bee7799051ef32e0f9ec9e26339b23b7dc057 UNKNOWN
   * 05c48a82487c41963697e99a31baeab9e3c82240 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18478) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 326d8751ac2aac4255b5e94da8e8f46a424c55c5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705) 
   * 5043cb61111d95f503534b53bac93c451ae367e9 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12909",
       "triggerID" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12924",
       "triggerID" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "triggerType" : "PUSH"
     }, {
       "hash" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13090",
       "triggerID" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d83bee7799051ef32e0f9ec9e26339b23b7dc057 UNKNOWN
   * 274dceea68cb81e8a2c20118fcc2ccf44fcb936b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13090) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14820:
URL: https://github.com/apache/flink/pull/14820#discussion_r570059373



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -901,54 +893,12 @@ public StreamStatusMaintainer getStreamStatusMaintainer() {
         return result;
     }
 
-    private boolean triggerCheckpoint(
+    protected boolean triggerCheckpoint(

Review comment:
       I also commit a diff to show the changes if we make `triggerCheckpoint` to be abstract: https://github.com/apache/flink/pull/14820/commits/42cdc662ef4b6416b30017a5c3cb090ee66f7d07




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5043cb61111d95f503534b53bac93c451ae367e9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706) 
   * 438f3d37438cd7d5128d4e92a128bbcf83952361 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846) 
   * 48599d7654cbb03fb31bd28f9fc795a103870d81 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14820:
URL: https://github.com/apache/flink/pull/14820#discussion_r570009052



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointableOneInputStreamTask.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+
+/** A test stream task that also response to the checkpoint trigger requirement. */
+public class CheckpointableOneInputStreamTask<IN, OUT> extends OneInputStreamTask<IN, OUT> {

Review comment:
       This is due to the logic for this test OneInputStream is different from the formal one in that it would trigger a snapshot (like source stream task) instead of go through the CheckpointBarrierHandler. They are mainly used in some test cases related to taking checkpoint on TM side.
   
   After some rethink I think for these test they would better to directly call `triggerCheckpointOnBarrier` to simulate the case that the alignment has finished. Thus I removed `CheckpointableOneInputStreamTask` and modifies the related tests to call `triggerCheckpointOnBarrier` directly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
dawidwys edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-857734904


   All right, I will need to think about it a bit more, but even assuming we can receive an RPC signal while the input channels are still active/non-finished I feel we can make it easier. I don't think there is a big enough difference between source and non-source tasks to justify extracting a separate hierarchy. The way I see it is that the only difference is if all the input gates has finished or not. (If there are no input gates all of them are obviously finished ;) ). In that scenario we could extend the `triggerCheckpointAsyncInMailbox` (it was renamed on master) to handle both cases quite easily.
   
   I think we don't need to expose the `SingleCheckpointBarrierHandler` in any way. Either with my approach or yours you would need to inject barriers on the RPC call into the channels/or drain the channels somehow, right? Otherwise you cannot checkpoint the data/align with the data in the channels properly. Or am I missing something? Therefore you could inject the barrier into channel/drain the channel from the `StreamTask` level and the logic in `SingleCheckpointBarrierHandler` would still be valid.
   
   Let me know what you think.
   
   StreamTask.java:
   ```
       @Override
       public Future<Boolean> triggerCheckpointAsync(
               CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
   
           CompletableFuture<Boolean> result = new CompletableFuture<>();
           mainMailboxExecutor.execute(
                   () -> {
                       try {
                           result.complete(
                                   triggerCheckpointAsyncInMailbox(
                                           checkpointMetaData, checkpointOptions));
                       } catch (Exception ex) {
                           // Report the failure both via the Future result but also to the mailbox
                           result.completeExceptionally(ex);
                           throw ex;
                       }
                   },
                   "checkpoint %s with %s",
                   checkpointMetaData,
                   checkpointOptions);
           return result;
       }
   
       private boolean triggerCheckpointAsyncInMailbox(
               CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)
               throws Exception {
           FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
           try {
               if (allInputsFinished()) {
                   return triggerCheckpointInRootNode(checkpointMetaData, checkpointOptions);
               } else {
                   throw new UnsupportedOperationException(
                           "We do not support triggering non root nodes yet.");
               }
           } catch (Exception e) {
               // propagate exceptions only if the task is still in "running" state
               if (isRunning) {
                   throw new Exception(
                           "Could not perform checkpoint "
                                   + checkpointMetaData.getCheckpointId()
                                   + " for operator "
                                   + getName()
                                   + '.',
                           e);
               } else {
                   LOG.debug(
                           "Could not perform checkpoint {} for operator {} while the "
                                   + "invokable was not in state running.",
                           checkpointMetaData.getCheckpointId(),
                           getName(),
                           e);
                   return false;
               }
           } finally {
               FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
           }
       }
   
       private boolean allInputsFinished() {
           return getEnvironment().getAllInputGates().length == 0
                   || Arrays.stream(getEnvironment().getAllInputGates())
                           .allMatch(InputGate::isFinished);
       }
   
       private boolean triggerCheckpointInRootNode(
               CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)
               throws Exception {
           latestAsyncCheckpointStartDelayNanos =
                   1_000_000
                           * Math.max(
                                   0, System.currentTimeMillis() - checkpointMetaData.getTimestamp());
   
           // No alignment if we inject a checkpoint
           CheckpointMetricsBuilder checkpointMetrics =
                   new CheckpointMetricsBuilder()
                           .setAlignmentDurationNanos(0L)
                           .setBytesProcessedDuringAlignment(0L)
                           .setCheckpointStartDelayNanos(latestAsyncCheckpointStartDelayNanos);
   
           subtaskCheckpointCoordinator.initInputsCheckpoint(
                   checkpointMetaData.getCheckpointId(), checkpointOptions);
   
           boolean success =
                   performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
           if (!success) {
               declineCheckpoint(checkpointMetaData.getCheckpointId());
           }
           return success;
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-859508853


   Hi Dawid, very thanks for the thoughts! 
   
   I think the analysis on the `EndOfDataEvent` is exactly right, currently when a task get triggered, all the user records must be processed and there are only possible barriers remaining. 
   
   For the current approach, I think it avoids the source / non-source split since it unified the logic of triggering checkpoints by distinguish the different situation via `getNotFinishedChannels()`, right ? And `getCheckpointBarrierHandler()` seems need to be an abstract method that source return null while non-source returns the actual `CheckpointBarrier`? 
   
   I think I am also agree with this method. I'll update the PR according to this approach. very thanks for the suggestions!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12909",
       "triggerID" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 48599d7654cbb03fb31bd28f9fc795a103870d81 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862) 
   * 7f98711c3de38815d24f2d35a5be5017dadad27c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12909) 
   * d83bee7799051ef32e0f9ec9e26339b23b7dc057 UNKNOWN
   * 42cdc662ef4b6416b30017a5c3cb090ee66f7d07 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12909",
       "triggerID" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 48599d7654cbb03fb31bd28f9fc795a103870d81 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862) 
   * 7f98711c3de38815d24f2d35a5be5017dadad27c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12909) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 48599d7654cbb03fb31bd28f9fc795a103870d81 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-854710138


   Hi @dawidwys very thanks for the suggestions!
   
   The main blocker from my view for directly performing the checkpoint  is to subsuming or waiting for the previous checkpoints. For example, suppose we have
   
   ```
   A ---|
        |--> C
   B ---|
   ```
   
   We may met the cases like
   
   1. A emits Barrier 5.
   2. A finished and emits EndOfPartition, but C has not received yet.
   3. B finished and emits EndOfPartition, but C has not received yet.
   4. The CheckpointCoordinator notifies C for checkpoint 6.
   
   In this case we either wait for checkpoint 5 finish and then trigger checkpoint 6, or subsuming the checkpoint 5 and trigger checkpoint 6. But in both cases, only `CheckpointBarrierHandler` knows the existence of checkpoint 5, thus it seems we have to notify the `CheckpointBarrierHandler` in some way ? 
   
   One possible alternative might be let the `CheckpointBarrierHandler` first do a check of outdatedness on alignment, if so, then abort the checkpoint. The drawback of this method is that the `CheckpointBarrierHandler` might do some useless work, and we would have two entrances for performCheckpoint() that are not fully independent.  
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
dawidwys edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-857734904


   All right, I will need to think about it a bit more, but even assuming we can receive an RPC signal while the input channels are still active/non-finished I feel we can make it easier. I don't think there is a big enough difference between source and non-source tasks to justify extracting a separate hierarchy. The way I see it is that the only difference is if all the input gates has finished or not. (If there are no input gates all of them are obviously finished ;) ). In that scenario we could extend the `triggerCheckpointAsyncInMailbox` (it was renamed on master) to handle both cases quite easily.
   
   I think we don't need to expose the `SingleCheckpointBarrierHandler` in any way. Either with my approach or yours you would need to inject barriers on the RPC call into the channels/or drain the channels somehow, right? Otherwise you cannot checkpoint the data/align with the data in the channels properly. Or am I missing something? Therefore you could inject the barrier into channel/drain the channel from the `StreamTask` level and the logic in `SingleCheckpointBarrierHandler` would still be valid.
   
   Let me know what you think.
   
   StreamTask.java:
   ```
       @Override
       public Future<Boolean> triggerCheckpointAsync(
               CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
   
           CompletableFuture<Boolean> result = new CompletableFuture<>();
           mainMailboxExecutor.execute(
                   () -> {
                       try {
                           result.complete(
                                   triggerCheckpointAsyncInMailbox(
                                           checkpointMetaData, checkpointOptions));
                       } catch (Exception ex) {
                           // Report the failure both via the Future result but also to the mailbox
                           result.completeExceptionally(ex);
                           throw ex;
                       }
                   },
                   "checkpoint %s with %s",
                   checkpointMetaData,
                   checkpointOptions);
           return result;
       }
   
       private boolean triggerCheckpointAsyncInMailbox(
               CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)
               throws Exception {
           FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
           try {
               if (allInputsFinished()) {
                   return triggerCheckpointInRootNode(checkpointMetaData, checkpointOptions);
               } else {
                   throw new UnsupportedOperationException(
                           "We do not support triggering non root nodes yet.");
               }
           } catch (Exception e) {
               // propagate exceptions only if the task is still in "running" state
               if (isRunning) {
                   throw new Exception(
                           "Could not perform checkpoint "
                                   + checkpointMetaData.getCheckpointId()
                                   + " for operator "
                                   + getName()
                                   + '.',
                           e);
               } else {
                   LOG.debug(
                           "Could not perform checkpoint {} for operator {} while the "
                                   + "invokable was not in state running.",
                           checkpointMetaData.getCheckpointId(),
                           getName(),
                           e);
                   return false;
               }
           } finally {
               FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
           }
       }
   
       private boolean allInputsFinished() {
           // we could add some caching if allInputGates have finished
           return getEnvironment().getAllInputGates().length == 0
                   || Arrays.stream(getEnvironment().getAllInputGates())
                           .allMatch(InputGate::isFinished);
       }
   
       private boolean triggerCheckpointInRootNode(
               CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)
               throws Exception {
           latestAsyncCheckpointStartDelayNanos =
                   1_000_000
                           * Math.max(
                                   0, System.currentTimeMillis() - checkpointMetaData.getTimestamp());
   
           // No alignment if we inject a checkpoint
           CheckpointMetricsBuilder checkpointMetrics =
                   new CheckpointMetricsBuilder()
                           .setAlignmentDurationNanos(0L)
                           .setBytesProcessedDuringAlignment(0L)
                           .setCheckpointStartDelayNanos(latestAsyncCheckpointStartDelayNanos);
   
           subtaskCheckpointCoordinator.initInputsCheckpoint(
                   checkpointMetaData.getCheckpointId(), checkpointOptions);
   
           boolean success =
                   performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
           if (!success) {
               declineCheckpoint(checkpointMetaData.getCheckpointId());
           }
           return success;
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14820:
URL: https://github.com/apache/flink/pull/14820#discussion_r570028569



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -901,54 +893,12 @@ public StreamStatusMaintainer getStreamStatusMaintainer() {
         return result;
     }
 
-    private boolean triggerCheckpoint(
+    protected boolean triggerCheckpoint(

Review comment:
       Currently not making it abstract is mainly because `MultipleInputStreamTask`  has override `triggerCheckpointAsync`, if we make ` triggerCheckpoint` abstract, then it has implemented an empty method which it does not use. If we want to also make `MultipleInputStreamTask` do not need to override `triggerCheckpointAsync` directly, we have to make the completable future a parameter to `triggerCheckpoint` and let the triggerCheckpoint to complete the future instead of hide it from `triggerCheckpoint`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5043cb61111d95f503534b53bac93c451ae367e9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706) 
   * 438f3d37438cd7d5128d4e92a128bbcf83952361 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846) 
   * 48599d7654cbb03fb31bd28f9fc795a103870d81 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #14820:
URL: https://github.com/apache/flink/pull/14820#discussion_r569539155



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestCheckpointBarrierHandler.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A test {@link TestCheckpointBarrierHandler} that records the history of checkpoint triggering.

Review comment:
       ```suggestion
    * A {@link CheckpointBarrierHandler} for testing that records the history of checkpoint triggering.
   ```

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -901,54 +893,12 @@ public StreamStatusMaintainer getStreamStatusMaintainer() {
         return result;
     }
 
-    private boolean triggerCheckpoint(
+    protected boolean triggerCheckpoint(

Review comment:
       Why is this not turned into an `abstract` method?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -877,12 +875,6 @@ public StreamStatusMaintainer getStreamStatusMaintainer() {
         CompletableFuture<Boolean> result = new CompletableFuture<>();
         mainMailboxExecutor.execute(
                 () -> {
-                    latestAsyncCheckpointStartDelayNanos =

Review comment:
       Why is this moved to the "inner" method? I found it a bit confusing that the "non-async" method not determines the start of the async part.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointableOneInputStreamTask.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+
+/** A test stream task that also response to the checkpoint trigger requirement. */
+public class CheckpointableOneInputStreamTask<IN, OUT> extends OneInputStreamTask<IN, OUT> {

Review comment:
       Why don't the tests work anymore with just the regular `OneInputStreamTask`? They also have an implemented `triggerCheckpoint()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 326d8751ac2aac4255b5e94da8e8f46a424c55c5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12909",
       "triggerID" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12924",
       "triggerID" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "triggerType" : "PUSH"
     }, {
       "hash" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13090",
       "triggerID" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "045971a4900dc6798be00108c8e639e1c77fa18c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13940",
       "triggerID" : "045971a4900dc6798be00108c8e639e1c77fa18c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d83bee7799051ef32e0f9ec9e26339b23b7dc057 UNKNOWN
   * 274dceea68cb81e8a2c20118fcc2ccf44fcb936b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13090) 
   * 045971a4900dc6798be00108c8e639e1c77fa18c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13940) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770325573


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit eec04bb42f0535bc7db0c9de9c414801c8e622fb (Sat Aug 28 12:18:42 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12909",
       "triggerID" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 48599d7654cbb03fb31bd28f9fc795a103870d81 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862) 
   * 7f98711c3de38815d24f2d35a5be5017dadad27c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12909) 
   * d83bee7799051ef32e0f9ec9e26339b23b7dc057 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
dawidwys commented on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-855839593


   I see. Let me ask an additional question. The problem applies only for UC, right? For AC that situation is not possible, because aligned barriers do not overtake the data buffers and tasks won't finish before the final aligned checkpoint. Would it work if we force such barriers to be aligned? I am trying to understand all the options here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12909",
       "triggerID" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12924",
       "triggerID" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "triggerType" : "PUSH"
     }, {
       "hash" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13090",
       "triggerID" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "045971a4900dc6798be00108c8e639e1c77fa18c",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13940",
       "triggerID" : "045971a4900dc6798be00108c8e639e1c77fa18c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "05c48a82487c41963697e99a31baeab9e3c82240",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "05c48a82487c41963697e99a31baeab9e3c82240",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d83bee7799051ef32e0f9ec9e26339b23b7dc057 UNKNOWN
   * 045971a4900dc6798be00108c8e639e1c77fa18c Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13940) 
   * 05c48a82487c41963697e99a31baeab9e3c82240 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-858512129


   Hi Dawid, very thanks for the suggestion! After some more think, now I think you are right that we would finally drain the records from all the channels and we might handle the triggering the remaining checkpoints directly in the `StreamTask` so that we do not need to expose the `BarrierHandler`. 
   
   With the above approach, it seems we would reject the checkpoints if not all channels get finished, right ? Perhaps we could change it a bit to holding these checkpoints until all the checkpoints get drained ? namely
   
   ```
   private final List<CheckpointBarrier> pendingCheckpoints = new ArrayList();
   
   private boolean triggerCheckpointAsyncInMailbox(
           CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)
           throws Exception {
       FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
       try {
           if (allInputsFinished()) {
               return triggerCheckpointInRootNode(checkpointMetaData, checkpointOptions);
           } else {
               pendingCheckpoints.add(new CheckpointBarrier(checkpointMetaData, checkpointOptions));
           }
       } catch (Exception e) {
          ...
       } finally {
           ...
       }
   }
   
   private void executeInvoke() throws Exception {
       runMailboxLoop();
       ensureNotCanceled();
   
       triggerAllPendingCheckpoints();
   
       afterInvoke();
   }
   
   
   private void triggerAllPendingCheckpoints() {
       for (CheckpointBarrier barrier : pendingCheckpoints) {
            triggerCheckpointInRootNode(barrier.getMetaData(), barrier.getOptions());
       }
      pendingCheckpoints.clear();
   }
   
   ```
   
   Do you think this would be ok~?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys closed pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint for non-source tasks

Posted by GitBox <gi...@apache.org>.
dawidwys closed pull request #14820:
URL: https://github.com/apache/flink/pull/14820


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12909",
       "triggerID" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12924",
       "triggerID" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "triggerType" : "PUSH"
     }, {
       "hash" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13090",
       "triggerID" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "045971a4900dc6798be00108c8e639e1c77fa18c",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13940",
       "triggerID" : "045971a4900dc6798be00108c8e639e1c77fa18c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d83bee7799051ef32e0f9ec9e26339b23b7dc057 UNKNOWN
   * 045971a4900dc6798be00108c8e639e1c77fa18c Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13940) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5043cb61111d95f503534b53bac93c451ae367e9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint for non-source tasks

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14820:
URL: https://github.com/apache/flink/pull/14820#discussion_r654164209



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -1012,6 +1027,35 @@ private boolean triggerCheckpointAsyncInMailbox(
         }
     }
 
+    private boolean triggerUnfinishedChannelsCheckpoint(
+            CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)
+            throws Exception {
+        Optional<CheckpointBarrierHandler> checkpointBarrierHandler = getCheckpointBarrierHandler();
+        checkState(
+                checkpointBarrierHandler.isPresent(),
+                "CheckpointBarrier should exist for tasks with network inputs.");
+
+        CheckpointBarrier barrier =
+                new CheckpointBarrier(
+                        checkpointMetaData.getCheckpointId(),
+                        checkpointMetaData.getTimestamp(),
+                        checkpointOptions);
+
+        for (IndexedInputGate inputGate : getEnvironment().getAllInputGates()) {
+            if (!inputGate.isFinished()) {
+                for (InputChannelInfo channelInfo : inputGate.getUnfinishedChannels()) {
+                    checkpointBarrierHandler.get().processBarrier(barrier, channelInfo);
+                }
+            }
+        }
+
+        return true;
+    }
+
+    protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler() {

Review comment:
       I updated the javadoc~

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -1012,6 +1027,35 @@ private boolean triggerCheckpointAsyncInMailbox(
         }
     }
 
+    private boolean triggerUnfinishedChannelsCheckpoint(
+            CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)
+            throws Exception {
+        Optional<CheckpointBarrierHandler> checkpointBarrierHandler = getCheckpointBarrierHandler();
+        checkState(
+                checkpointBarrierHandler.isPresent(),
+                "CheckpointBarrier should exist for tasks with network inputs.");
+
+        CheckpointBarrier barrier =
+                new CheckpointBarrier(
+                        checkpointMetaData.getCheckpointId(),
+                        checkpointMetaData.getTimestamp(),
+                        checkpointOptions);
+
+        for (IndexedInputGate inputGate : getEnvironment().getAllInputGates()) {
+            if (!inputGate.isFinished()) {
+                for (InputChannelInfo channelInfo : inputGate.getUnfinishedChannels()) {
+                    checkpointBarrierHandler.get().processBarrier(barrier, channelInfo);
+                }
+            }
+        }
+
+        return true;
+    }
+
+    protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler() {

Review comment:
       I added the javadoc~

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -175,10 +176,23 @@ protected void createInputProcessor(
                         getEnvironment().getTaskInfo());
     }
 
+    protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler() {
+        return Optional.ofNullable(checkpointBarrierHandler);
+    }
+
     @Override
     public Future<Boolean> triggerCheckpointAsync(
             CheckpointMetaData metadata, CheckpointOptions options) {
 
+        if (operatorChain.getSourceTaskInputs().size() == 0) {
+            return super.triggerCheckpointAsync(metadata, options);
+        }
+
+        // If there are chained sources, we would always only trigger
+        // the chained sources for checkpoint. This means that for
+        // the checkpoints during the upstream task finished and
+        // this task receives the EndOfPartitionEvent, the checkpoint
+        // would not subsume the pending ones.

Review comment:
       Yes, we should indeed also be able to subsume the checkpoints in this case, and the actual difference is that we would not insert barriers for the unfinished network inputs and it needs to wait for the EndOfPartitionEvent. I'll fix the comments~

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -77,6 +77,12 @@
 
     private static final int CHECKPOINT_EXECUTION_DELAY_LOG_THRESHOLD_MS = 30_000;
 
+    /**
+     * TODO Whether enables checkpoints after tasks finished. This is a temporary flag and will be
+     * removed in the last PR.
+     */
+    protected boolean enableCheckpointAfterTasksFinished;

Review comment:
       This should be a mistake... I changed it to private

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1723,6 +1725,86 @@ protected void cancelTask() {
         assertTrue(OpenFailingOperator.wasClosed);
     }
 
+    @Test
+    public void testTriggeringCheckpointWithFinishedChannels() throws Exception {
+        OneInputStreamTaskTestHarness<String, String> testHarness =

Review comment:
       I updated the test~ 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12909",
       "triggerID" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12924",
       "triggerID" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "triggerType" : "PUSH"
     }, {
       "hash" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13090",
       "triggerID" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "045971a4900dc6798be00108c8e639e1c77fa18c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13940",
       "triggerID" : "045971a4900dc6798be00108c8e639e1c77fa18c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "05c48a82487c41963697e99a31baeab9e3c82240",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18478",
       "triggerID" : "05c48a82487c41963697e99a31baeab9e3c82240",
       "triggerType" : "PUSH"
     }, {
       "hash" : "061dc5ee35fdfaf474382cf6c4e385c7d70258b3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "061dc5ee35fdfaf474382cf6c4e385c7d70258b3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d83bee7799051ef32e0f9ec9e26339b23b7dc057 UNKNOWN
   * 05c48a82487c41963697e99a31baeab9e3c82240 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18478) 
   * 061dc5ee35fdfaf474382cf6c4e385c7d70258b3 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12909",
       "triggerID" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12924",
       "triggerID" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "triggerType" : "PUSH"
     }, {
       "hash" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13090",
       "triggerID" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "045971a4900dc6798be00108c8e639e1c77fa18c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "045971a4900dc6798be00108c8e639e1c77fa18c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d83bee7799051ef32e0f9ec9e26339b23b7dc057 UNKNOWN
   * 274dceea68cb81e8a2c20118fcc2ccf44fcb936b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13090) 
   * 045971a4900dc6798be00108c8e639e1c77fa18c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12909",
       "triggerID" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12924",
       "triggerID" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "triggerType" : "PUSH"
     }, {
       "hash" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13090",
       "triggerID" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "045971a4900dc6798be00108c8e639e1c77fa18c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13940",
       "triggerID" : "045971a4900dc6798be00108c8e639e1c77fa18c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "05c48a82487c41963697e99a31baeab9e3c82240",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18478",
       "triggerID" : "05c48a82487c41963697e99a31baeab9e3c82240",
       "triggerType" : "PUSH"
     }, {
       "hash" : "061dc5ee35fdfaf474382cf6c4e385c7d70258b3",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18993",
       "triggerID" : "061dc5ee35fdfaf474382cf6c4e385c7d70258b3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d83bee7799051ef32e0f9ec9e26339b23b7dc057 UNKNOWN
   * 05c48a82487c41963697e99a31baeab9e3c82240 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18478) 
   * 061dc5ee35fdfaf474382cf6c4e385c7d70258b3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18993) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint for non-source tasks

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-861368486


   Hi @dawidwys, I updated the PR according the above discussion. Some points are met during the discussion that might need consideration:
   
   1. For `MultipleInputStreamTask`, if there is no chained source, it would behave like ordinary `StreamTask`. But if there are sources chained, the task would always get triggered since it is also a kind of source, and we would always only trigger the chained sources to initiate the checkpoint on the task side. Then if the checkpoint is triggered between the upstream tasks are finished and `MultipleInputStreamTask` received the `EndOfPartitionEvent`, we could not subsume the pending checkpoints like the other `StreamTask` do. But since in this case the only possible records are `EndOfPartitionEvent` and `CheckpointBarrier`, logically they would not be delayed too long, and it only have affect in a short of period, I think it might be acceptable. Thus do you think this would be a problem ? 
   2. Regarding how to acquire the unfinished channels from `IndexInputGate`, returning an `Iterable<InputChannelInfo>` or pass a `Consumer<InputChannelInfo>` could avoid allocating additional memory, but they all not work well in considering that we need to access `channelsWithEndOfPartitionEvents` in the lock scope. Thus I think we might still need to return a copied list of unfinished input channels.
   
   Very thanks for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint for non-source tasks

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-863783480






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-855904191


   Hi @dawidwys I think this might be still happen in AC since 
   
   1. For the final version not all tasks might need to wait for the last aligned checkpoint since some tasks do not have operators using 2pc. 
   2. There might be still some cases even if `A` and `B` all need to wait for the last checkpoints. Suppose they are all waiting for checkpoint 4, with concurrent checkpoints we might start triggering checkpoint 5. However the network stalled after emitting barrier 4. Then we finally have checkpoint 4 completed, notify `A` and `B` to make them emit `EndOfPartition` and exit. Then if JM trigger checkpoint 6 now we would have the similar case. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12909",
       "triggerID" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12924",
       "triggerID" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "triggerType" : "PUSH"
     }, {
       "hash" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13090",
       "triggerID" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "045971a4900dc6798be00108c8e639e1c77fa18c",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13940",
       "triggerID" : "045971a4900dc6798be00108c8e639e1c77fa18c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "05c48a82487c41963697e99a31baeab9e3c82240",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18478",
       "triggerID" : "05c48a82487c41963697e99a31baeab9e3c82240",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d83bee7799051ef32e0f9ec9e26339b23b7dc057 UNKNOWN
   * 045971a4900dc6798be00108c8e639e1c77fa18c Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13940) 
   * 05c48a82487c41963697e99a31baeab9e3c82240 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18478) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14820:
URL: https://github.com/apache/flink/pull/14820#discussion_r570059373



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -901,54 +893,12 @@ public StreamStatusMaintainer getStreamStatusMaintainer() {
         return result;
     }
 
-    private boolean triggerCheckpoint(
+    protected boolean triggerCheckpoint(

Review comment:
       I also commit a diff to show the changes if we make `triggerCheckpoint` to be abstract: https://github.com/apache/flink/pull/14820/commits/83de6dda8fab862126c1e91010141828b93fe466




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5043cb61111d95f503534b53bac93c451ae367e9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706) 
   * 438f3d37438cd7d5128d4e92a128bbcf83952361 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12705",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12706",
       "triggerID" : "5043cb61111d95f503534b53bac93c451ae367e9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12846",
       "triggerID" : "438f3d37438cd7d5128d4e92a128bbcf83952361",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12862",
       "triggerID" : "48599d7654cbb03fb31bd28f9fc795a103870d81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12909",
       "triggerID" : "7f98711c3de38815d24f2d35a5be5017dadad27c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d83bee7799051ef32e0f9ec9e26339b23b7dc057",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12924",
       "triggerID" : "42cdc662ef4b6416b30017a5c3cb090ee66f7d07",
       "triggerType" : "PUSH"
     }, {
       "hash" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "274dceea68cb81e8a2c20118fcc2ccf44fcb936b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d83bee7799051ef32e0f9ec9e26339b23b7dc057 UNKNOWN
   * 42cdc662ef4b6416b30017a5c3cb090ee66f7d07 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12924) 
   * 274dceea68cb81e8a2c20118fcc2ccf44fcb936b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770325573


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 326d8751ac2aac4255b5e94da8e8f46a424c55c5 (Sun Jan 31 04:51:56 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-21085).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14820: [FLINK-21085][runtime][checkpoint] Allows triggering checkpoint barrier handler for non-source tasks

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-770327711


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "326d8751ac2aac4255b5e94da8e8f46a424c55c5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 326d8751ac2aac4255b5e94da8e8f46a424c55c5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org