You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/23 14:30:54 UTC

[GitHub] [flink] gaoyunhaii opened a new pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

gaoyunhaii opened a new pull request #14734:
URL: https://github.com/apache/flink/pull/14734


   ## What is the purpose of the change
   
   This PR refactors the CheckpointCoordinator logic to compute the tasks to trigger/wait/commit dynamically. Previously it is done during compile phase and passed to CheckpointCoordinator via JobGraph. This is a premise for support considering finished tasks into checkpoint, since considering finished tasks would cause each checkpoint might need to trigger different tasks and have to re-compute at each checkpoint
   
   
   ## Brief change log
   
   - 721dd38bb286722cbb1a6925dbedbb3614930b20 did the refactor.
   
   ## Verifying this change
   
   This change is a code refactor and could be covered by existing tests. 
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **no**
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **no**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **no**
     - If yes, how is the feature documented? **not applicable**
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766088540


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 721dd38bb286722cbb1a6925dbedbb3614930b20 (Sat Jan 23 14:33:14 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669",
       "triggerID" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738",
       "triggerID" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0c94f50f2a8e9a5753479fce0cb18e02a1931c16 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476) 
   * 39b84b60a20f67d813599bcea74ebbc28e9be573 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669) 
   * a3305f5a4c8c8e4d12b90130bdf72a43f5a96887 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669",
       "triggerID" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738",
       "triggerID" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12740",
       "triggerID" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0c94f50f2a8e9a5753479fce0cb18e02a1931c16 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476) 
   * 39b84b60a20f67d813599bcea74ebbc28e9be573 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669) 
   * a3305f5a4c8c8e4d12b90130bdf72a43f5a96887 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738) 
   * 67aad0ca126a0e74cf8449451af85506aba43fbb Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12740) 
   * 116191180919cc10e800d2bae10fc4449cd90b43 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r567541421



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -563,7 +554,7 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {
                                             createPendingCheckpoint(
                                                     timestamp,
                                                     request.props,
-                                                    ackTasks,
+                                                    checkpointBrief,

Review comment:
       What I understand is that the `CheckpointBrief` describes how to “get” a `CompleteChekcpoint`. There are some methods related to“get”a CompleteCheckpoint, such as 
   “createPendingCheckpoint/reportToStateTracker/sendAbortedMessage/sendAcknowledgeMessages”.
   
   However, these methods sometime use `CheckpointBrief`, sometime uses `tasksTowaitFor/tasksToTriggerFor`, which are the internal state of the `CheckpointCoordinator`. After we refactor maybe all the methods could only use the `CheckpointBrief` as their parameter. (I think CheckpointId,TimeStamps should also be in the CheckpointBrief).
   
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r570599733



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##########
@@ -464,7 +459,13 @@ public void failJobDueToTaskFailure(
                         checkpointsCleaner,
                         new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
                         SharedStateRegistry.DEFAULT_FACTORY,
-                        failureManager);
+                        failureManager,
+                        new CheckpointBriefCalculator(
+                                getJobID(),
+                                sourceAndAllVertices.f0,
+                                sourceAndAllVertices.f1,
+                                sourceAndAllVertices.f1),
+                        new ExecutionAttemptMappingProvider(sourceAndAllVertices.f1));

Review comment:
       I share your concern and I also like your proposal :+1: 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r569486717



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBrief.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The brief of one checkpoint, indicating which tasks to trigger, waiting for acknowledge or commit
+ * for one specific checkpoint.
+ */
+public class CheckpointBrief {

Review comment:
       To me, the name `CheckpointBrief` is a bit ambiguous.
   How about `CheckpointTasks`, `CheckpointPlan`, `CheckpointSpec` (the latter overlaps with options though)?
   
   edit: is `public` really necessary here?
   
   ditto: `CheckpointBriefCalculator`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669",
       "triggerID" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738",
       "triggerID" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12740",
       "triggerID" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12744",
       "triggerID" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "916cc586e1ba5a0ecdb3941d2706a4ead1816af5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12934",
       "triggerID" : "916cc586e1ba5a0ecdb3941d2706a4ead1816af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c1ecfa3e7a8bd5a6195009dcad6207881a15a077",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c1ecfa3e7a8bd5a6195009dcad6207881a15a077",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 39b84b60a20f67d813599bcea74ebbc28e9be573 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669) 
   * 116191180919cc10e800d2bae10fc4449cd90b43 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12744) 
   * 916cc586e1ba5a0ecdb3941d2706a4ead1816af5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12934) 
   * c1ecfa3e7a8bd5a6195009dcad6207881a15a077 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r567838726



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ExecutionAttemptMappingProvider.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Responses to query the current attempt for the tasks and provide the mapping from the execution
+ * attempt id to its task.
+ */
+public class ExecutionAttemptMappingProvider {
+
+    private final List<ExecutionVertex> tasks;

Review comment:
       Ok, makes sense. Please leave it as is 👌
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565260718



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
##########
@@ -197,22 +197,13 @@ public static ExecutionGraph buildGraph(
         // configure the state checkpointing
         if (isCheckpointingEnabled(jobGraph)) {
             JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
-            List<ExecutionJobVertex> triggerVertices =
-                    idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
-
-            List<ExecutionJobVertex> ackVertices =
-                    idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);
-
-            List<ExecutionJobVertex> confirmVertices =
-                    idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);
 

Review comment:
       `idToVertex` could be removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r569486717



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBrief.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The brief of one checkpoint, indicating which tasks to trigger, waiting for acknowledge or commit
+ * for one specific checkpoint.
+ */
+public class CheckpointBrief {

Review comment:
       To me, the name `CheckpointBrief` is a bit ambiguous.
   How about `CheckpointTasks`, `CheckpointPlan`, `CheckpointSpec` (the latter overlaps with options though)?
   
   ditto: `CheckpointBriefCalculator`

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBriefCalculator.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Computes the tasks to trigger, wait or commit for each checkpoint. */
+public class CheckpointBriefCalculator {
+    private static final Logger LOG = LoggerFactory.getLogger(CheckpointBriefCalculator.class);
+
+    private final JobID jobId;
+
+    private final List<ExecutionVertex> tasksToTrigger;
+
+    private final List<ExecutionVertex> tasksToWait;
+
+    private final List<ExecutionVertex> tasksToCommitTo;
+
+    public CheckpointBriefCalculator(
+            JobID jobId,
+            List<ExecutionVertex> tasksToTrigger,
+            List<ExecutionVertex> tasksToWait,
+            List<ExecutionVertex> tasksToCommitTo) {
+
+        this.jobId = jobId;
+        this.tasksToTrigger = Collections.unmodifiableList(tasksToTrigger);
+        this.tasksToWait = Collections.unmodifiableList(tasksToWait);
+        this.tasksToCommitTo = Collections.unmodifiableList(tasksToCommitTo);
+    }
+
+    public CheckpointBrief calculateCheckpointBrief() throws CheckpointException {
+        List<Execution> tasksToTrigger = getTriggerExecutions();
+        Map<ExecutionAttemptID, ExecutionVertex> tasksToWait = getAckTasks();
+
+        return new CheckpointBrief(
+                Collections.unmodifiableList(tasksToTrigger), tasksToWait, tasksToCommitTo);

Review comment:
       I think we should do that (make it unmodifiable here and copy in `PendingCheckpoint`). I expect `CheckpointBrief` to be immutable.
   
   Besides, this class isn't concerned about mutability of these three structures: two are newly created, one is already immutable (`tasksToCommitTo`).
   So I'd move wrapping with `unmodifiable` to the `CheckpointBrief` constructor.
   
   nit: inline variables?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -350,13 +340,15 @@ public CheckpointCoordinator(
                         this.minPauseBetweenCheckpoints,
                         this.pendingCheckpoints::size,
                         this.checkpointsCleaner::getNumberOfCheckpointsToClean);
+
         this.cachedTasksById =
-                new LinkedHashMap<ExecutionAttemptID, ExecutionVertex>(tasksToWaitFor.length) {
+                new LinkedHashMap<ExecutionAttemptID, ExecutionVertex>(
+                        attemptMappingProvider.getNumberOfTasks()) {
 
                     @Override
                     protected boolean removeEldestEntry(
                             Map.Entry<ExecutionAttemptID, ExecutionVertex> eldest) {
-                        return size() > CheckpointCoordinator.this.tasksToWaitFor.length;
+                        return size() > attemptMappingProvider.getNumberOfTasks();

Review comment:
       The interaction between cache in `CheckpointCoordinator` and `ExecutionAttemptMappingProvider` seems a bit fragile to me. And they both have kind of overlapping responsibilities.
   
   What do you think about moving the cache istelf to the new `ExecutionAttemptMappingProvider`? 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##########
@@ -464,7 +459,13 @@ public void failJobDueToTaskFailure(
                         checkpointsCleaner,
                         new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
                         SharedStateRegistry.DEFAULT_FACTORY,
-                        failureManager);
+                        failureManager,
+                        new CheckpointBriefCalculator(
+                                getJobID(),
+                                sourceAndAllVertices.f0,
+                                sourceAndAllVertices.f1,
+                                sourceAndAllVertices.f1),
+                        new ExecutionAttemptMappingProvider(sourceAndAllVertices.f1));

Review comment:
       I think `ExecutionGraph` needs `getSourceAndAllVertices()` only to construct these two objects.
   Why not move this method to `CheckpointBriefCalculator`. 
   I guess it will also make future steps easier (dynamic sources calculation).
   
   `ExecutionAttemptMappingProvider` can then depend on `CheckpointBriefCalculator`.
   
   WDYT?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
##########
@@ -142,6 +145,7 @@ public PendingCheckpoint(
         this.checkpointId = checkpointId;
         this.checkpointTimestamp = checkpointTimestamp;
         this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm);
+        this.tasksToCommitTo = checkNotNull(tasksToCommitTo);

Review comment:
       nit: one step further would be to store `CheckpointBrief` in `PendingCheckpoint` instead of collections. But that's probably out of scope of this PR




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r567826582



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -2138,12 +2080,10 @@ private void reportToStatsTracker(
         }
         Map<JobVertexID, Integer> vertices =
                 tasks.values().stream()
-                        .map(ExecutionVertex::getJobVertex)
-                        .distinct()
                         .collect(
-                                toMap(
-                                        ExecutionJobVertex::getJobVertexId,
-                                        ExecutionJobVertex::getParallelism));
+                                Collectors.groupingBy(
+                                        ExecutionVertex::getJobvertexId,
+                                        Collectors.reducing(0, e -> 1, Integer::sum)));

Review comment:
       This change is doing a preparation for the case that are some tasks are finished. In this case the `tasksToWaitFor` might not contains all the job vertices and we need to compute based on both `tasksToWaitFor` and `finishedTasks`. But it should be more suitable to be changed together in the next PR, so I'll revert it first.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r566909909



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -730,7 +715,8 @@ private PendingCheckpoint createPendingCheckpoint(
 
         if (statsTracker != null) {
             PendingCheckpointStats callback =
-                    statsTracker.reportPendingCheckpoint(checkpointID, timestamp, props);
+                    statsTracker.reportPendingCheckpoint(

Review comment:
       Got it, I agree with this. Since the master has been evolved to use the `tasksToWaitFor`, so I just kept the latest behavior in this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-773262082


   Hi Roman @rkhachatryan very thanks for the comments and it makes the PR much more clear. I updated the PR according to the comments~


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 721dd38bb286722cbb1a6925dbedbb3614930b20 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r566163264



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##########
@@ -448,14 +441,24 @@ public void failJobDueToTaskFailure(
                         new DispatcherThreadFactory(
                                 Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
 
+        List<ExecutionVertex> sourceVertices = new ArrayList<>();

Review comment:
       You could move it to the Computer or put it in a separate method in `ExecutionGraph`, just to make this method here smaller, because it's already quite big.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM closed pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
guoweiM closed pull request #14734:
URL: https://github.com/apache/flink/pull/14734


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565277280



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBriefComputer.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Computes the tasks to trigger, wait or commit for each checkpoint. */
+public class CheckpointBriefComputer {
+    private static final Logger LOG = LoggerFactory.getLogger(CheckpointBriefComputer.class);
+
+    private final JobID jobId;
+
+    private final List<ExecutionVertex> tasksToTrigger;
+
+    private final List<ExecutionVertex> tasksToWait;
+
+    private final List<ExecutionVertex> tasksToCommit;
+
+    public CheckpointBriefComputer(
+            JobID jobId,
+            List<ExecutionVertex> tasksToTrigger,
+            List<ExecutionVertex> tasksToWait,
+            List<ExecutionVertex> tasksToCommit) {
+
+        this.jobId = jobId;
+        this.tasksToTrigger = Collections.unmodifiableList(tasksToTrigger);
+        this.tasksToWait = Collections.unmodifiableList(tasksToWait);
+        this.tasksToCommit = Collections.unmodifiableList(tasksToCommit);
+    }
+
+    public CheckpointBrief computeCheckpointBrief() throws CheckpointException {
+        List<Execution> tasksToTrigger = getTriggerExecutions();
+        Map<ExecutionAttemptID, ExecutionVertex> ackTasks = getAckTasks();
+
+        return new CheckpointBrief(tasksToTrigger, ackTasks, tasksToCommit);
+    }
+
+    private List<Execution> getTriggerExecutions() throws CheckpointException {

Review comment:
       I think there could be comments just like the `getAckTasks` .  :-)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669",
       "triggerID" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0c94f50f2a8e9a5753479fce0cb18e02a1931c16 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476) 
   * 39b84b60a20f67d813599bcea74ebbc28e9be573 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r567825103



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1935,6 +1935,7 @@ private void abortPendingCheckpoint(
                 }
             } finally {
                 sendAbortedMessages(
+                        pendingCheckpoint.getTasksToCommitTo(),

Review comment:
       Based on current implementation `tasksToWaitFor` should be always equivalent to `tasksToCommitTo`. For me using `tasksToCommitTo` mainly comes from to keep the logic un-change, and also be symmetric with `sendCommitMessages`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r567749554



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBriefCalculator.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Computes the tasks to trigger, wait or commit for each checkpoint. */
+public class CheckpointBriefCalculator {
+    private static final Logger LOG = LoggerFactory.getLogger(CheckpointBriefCalculator.class);
+
+    private final JobID jobId;
+
+    private final List<ExecutionVertex> tasksToTrigger;
+
+    private final List<ExecutionVertex> tasksToWait;
+
+    private final List<ExecutionVertex> tasksToCommitTo;
+
+    public CheckpointBriefCalculator(
+            JobID jobId,
+            List<ExecutionVertex> tasksToTrigger,
+            List<ExecutionVertex> tasksToWait,
+            List<ExecutionVertex> tasksToCommitTo) {
+
+        this.jobId = jobId;
+        this.tasksToTrigger = Collections.unmodifiableList(tasksToTrigger);
+        this.tasksToWait = Collections.unmodifiableList(tasksToWait);
+        this.tasksToCommitTo = Collections.unmodifiableList(tasksToCommitTo);
+    }
+
+    public CheckpointBrief calculateCheckpointBrief() throws CheckpointException {
+        List<Execution> tasksToTrigger = getTriggerExecutions();
+        Map<ExecutionAttemptID, ExecutionVertex> tasksToWait = getAckTasks();
+
+        return new CheckpointBrief(
+                Collections.unmodifiableList(tasksToTrigger), tasksToWait, tasksToCommitTo);

Review comment:
       We could not mark `tasksToWait` as readonly since in `PendingCheckpoint` it would be reused to record not-acknowledged tasks and would be modified if tasks report snapshot. If we change it to read-only, we would need to clone it when creating `PendingCheckpoint`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 721dd38bb286722cbb1a6925dbedbb3614930b20 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565315896



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBrief.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The brief of one checkpoint, indicating which tasks to trigger, waiting for acknowledge or commit
+ * for one specific checkpoint.
+ */
+public class CheckpointBrief {
+
+    /** Tasks who need to be sent a message when a checkpoint is started. */
+    private final List<Execution> tasksToTrigger;
+
+    /** Tasks who need to acknowledge a checkpoint before it succeeds. */
+    private final Map<ExecutionAttemptID, ExecutionVertex> tasksToAck;
+
+    /**
+     * Tasks that are still running when taking the checkpoint, these need to be sent a message when
+     * the checkpoint is confirmed.
+     */
+    private final List<ExecutionVertex> runningTasks;

Review comment:
       Maybe we should change the `runningTasks` to `tasksToCommitTo`. This pr is for refactoring so maybe we should keep the concept consistent first. What do you think? 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669",
       "triggerID" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0c94f50f2a8e9a5753479fce0cb18e02a1931c16 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476) 
   * 39b84b60a20f67d813599bcea74ebbc28e9be573 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669) 
   * a3305f5a4c8c8e4d12b90130bdf72a43f5a96887 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r570164405



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBrief.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The brief of one checkpoint, indicating which tasks to trigger, waiting for acknowledge or commit
+ * for one specific checkpoint.
+ */
+public class CheckpointBrief {

Review comment:
       I think I would prefer more to `CheckpointPlan` since it is more easy to understand and do not have overlaps. I changed it to `CheckpointPlan`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBriefCalculator.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Computes the tasks to trigger, wait or commit for each checkpoint. */
+public class CheckpointBriefCalculator {
+    private static final Logger LOG = LoggerFactory.getLogger(CheckpointBriefCalculator.class);
+
+    private final JobID jobId;
+
+    private final List<ExecutionVertex> tasksToTrigger;
+
+    private final List<ExecutionVertex> tasksToWait;
+
+    private final List<ExecutionVertex> tasksToCommitTo;
+
+    public CheckpointBriefCalculator(
+            JobID jobId,
+            List<ExecutionVertex> tasksToTrigger,
+            List<ExecutionVertex> tasksToWait,
+            List<ExecutionVertex> tasksToCommitTo) {
+
+        this.jobId = jobId;
+        this.tasksToTrigger = Collections.unmodifiableList(tasksToTrigger);
+        this.tasksToWait = Collections.unmodifiableList(tasksToWait);
+        this.tasksToCommitTo = Collections.unmodifiableList(tasksToCommitTo);
+    }
+
+    public CheckpointBrief calculateCheckpointBrief() throws CheckpointException {
+        List<Execution> tasksToTrigger = getTriggerExecutions();
+        Map<ExecutionAttemptID, ExecutionVertex> tasksToWait = getAckTasks();
+
+        return new CheckpointBrief(
+                Collections.unmodifiableList(tasksToTrigger), tasksToWait, tasksToCommitTo);

Review comment:
       I also agree with that make it unmodifiable would be more clean and more error-prone. Thus I made it unmodifiable and copy in `PendingCheckpoint`. 
   
   I also make the variables inline.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
##########
@@ -142,6 +145,7 @@ public PendingCheckpoint(
         this.checkpointId = checkpointId;
         this.checkpointTimestamp = checkpointTimestamp;
         this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm);
+        this.tasksToCommitTo = checkNotNull(tasksToCommitTo);

Review comment:
       I also more tend to store `CheckpointPlan` to `PendingCheckpoint`. We also do it in the second PR since there would be multiple collections to pass. I then change it in this PR and store `CheckpointPlan` in `PendingCheckpoint`. 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -350,13 +340,15 @@ public CheckpointCoordinator(
                         this.minPauseBetweenCheckpoints,
                         this.pendingCheckpoints::size,
                         this.checkpointsCleaner::getNumberOfCheckpointsToClean);
+
         this.cachedTasksById =
-                new LinkedHashMap<ExecutionAttemptID, ExecutionVertex>(tasksToWaitFor.length) {
+                new LinkedHashMap<ExecutionAttemptID, ExecutionVertex>(
+                        attemptMappingProvider.getNumberOfTasks()) {
 
                     @Override
                     protected boolean removeEldestEntry(
                             Map.Entry<ExecutionAttemptID, ExecutionVertex> eldest) {
-                        return size() > CheckpointCoordinator.this.tasksToWaitFor.length;
+                        return size() > attemptMappingProvider.getNumberOfTasks();

Review comment:
       I think it would be much more clear to move cache to `ExecutionAttemptMappingProvider`.  I then moved it to `ExecutionAttemptMappingProvider`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##########
@@ -464,7 +459,13 @@ public void failJobDueToTaskFailure(
                         checkpointsCleaner,
                         new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
                         SharedStateRegistry.DEFAULT_FACTORY,
-                        failureManager);
+                        failureManager,
+                        new CheckpointBriefCalculator(
+                                getJobID(),
+                                sourceAndAllVertices.f0,
+                                sourceAndAllVertices.f1,
+                                sourceAndAllVertices.f1),
+                        new ExecutionAttemptMappingProvider(sourceAndAllVertices.f1));

Review comment:
       I also more like to move the creation to be inside `CheckpointPlanCalculator`. However, since currently the unit tests relies on passing mock vertices directly on creating `CheckpointCoordinator`, thus I think we may postpone the change to the next PR, in the second PR we would refactor the current tests to be based on real `ExecutionGraph` and then we could move the creation into inside `CheckpointPlanCalculator`.
   
   I'm a little concern for make `ExecutionAttemptMappingProvider` to be depend on `CheckpointPlanCalculator` since logically I think the two components should be for two separate purposes. Besides, we would also adjust the logic of the `CheckpointPlanCalculator` in the next PR. Do you think it would be also ok to separate the two components and make them all depends on ExecutionGraph~?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-773262082


   Hi Roman @rkhachatryan very thanks for the comments and it makes the PR much more clear. I updated the PR according to the comments~


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r566893536



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
##########
@@ -90,6 +90,8 @@
 
     private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;
 
+    private final List<ExecutionVertex> runningTasks;

Review comment:
       I updated all the name to `tasksToCommitTo`~




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669",
       "triggerID" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738",
       "triggerID" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12740",
       "triggerID" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12744",
       "triggerID" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "916cc586e1ba5a0ecdb3941d2706a4ead1816af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12934",
       "triggerID" : "916cc586e1ba5a0ecdb3941d2706a4ead1816af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c1ecfa3e7a8bd5a6195009dcad6207881a15a077",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12941",
       "triggerID" : "c1ecfa3e7a8bd5a6195009dcad6207881a15a077",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e666fead74311e8b2d6d8359d3098a1e3f139234",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e666fead74311e8b2d6d8359d3098a1e3f139234",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 39b84b60a20f67d813599bcea74ebbc28e9be573 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669) 
   * c1ecfa3e7a8bd5a6195009dcad6207881a15a077 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12941) 
   * e666fead74311e8b2d6d8359d3098a1e3f139234 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 721dd38bb286722cbb1a6925dbedbb3614930b20 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410) 
   * 0c94f50f2a8e9a5753479fce0cb18e02a1931c16 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669",
       "triggerID" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738",
       "triggerID" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12740",
       "triggerID" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12744",
       "triggerID" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "916cc586e1ba5a0ecdb3941d2706a4ead1816af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12934",
       "triggerID" : "916cc586e1ba5a0ecdb3941d2706a4ead1816af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c1ecfa3e7a8bd5a6195009dcad6207881a15a077",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12941",
       "triggerID" : "c1ecfa3e7a8bd5a6195009dcad6207881a15a077",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e666fead74311e8b2d6d8359d3098a1e3f139234",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13000",
       "triggerID" : "e666fead74311e8b2d6d8359d3098a1e3f139234",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 39b84b60a20f67d813599bcea74ebbc28e9be573 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669) 
   * e666fead74311e8b2d6d8359d3098a1e3f139234 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13000) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766088540


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit e666fead74311e8b2d6d8359d3098a1e3f139234 (Fri May 28 08:57:27 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r566910553



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
##########
@@ -183,23 +166,27 @@ public CheckpointStatsSnapshot createSnapshot() {
     /**
      * Creates a new pending checkpoint tracker.
      *
+     * @param tasksToAck Tasks to acknowledge in this checkpoint.
      * @param checkpointId ID of the checkpoint.
      * @param triggerTimestamp Trigger timestamp of the checkpoint.
      * @param props The checkpoint properties.
      * @return Tracker for statistics gathering.
      */
     PendingCheckpointStats reportPendingCheckpoint(
-            long checkpointId, long triggerTimestamp, CheckpointProperties props) {
+            List<ExecutionVertex> tasksToAck,

Review comment:
       The method has been evolved, thus we do not need to modify this for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r570167967



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBriefCalculator.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Computes the tasks to trigger, wait or commit for each checkpoint. */
+public class CheckpointBriefCalculator {
+    private static final Logger LOG = LoggerFactory.getLogger(CheckpointBriefCalculator.class);
+
+    private final JobID jobId;
+
+    private final List<ExecutionVertex> tasksToTrigger;
+
+    private final List<ExecutionVertex> tasksToWait;
+
+    private final List<ExecutionVertex> tasksToCommitTo;
+
+    public CheckpointBriefCalculator(
+            JobID jobId,
+            List<ExecutionVertex> tasksToTrigger,
+            List<ExecutionVertex> tasksToWait,
+            List<ExecutionVertex> tasksToCommitTo) {
+
+        this.jobId = jobId;
+        this.tasksToTrigger = Collections.unmodifiableList(tasksToTrigger);
+        this.tasksToWait = Collections.unmodifiableList(tasksToWait);
+        this.tasksToCommitTo = Collections.unmodifiableList(tasksToCommitTo);
+    }
+
+    public CheckpointBrief calculateCheckpointBrief() throws CheckpointException {
+        List<Execution> tasksToTrigger = getTriggerExecutions();
+        Map<ExecutionAttemptID, ExecutionVertex> tasksToWait = getAckTasks();
+
+        return new CheckpointBrief(
+                Collections.unmodifiableList(tasksToTrigger), tasksToWait, tasksToCommitTo);

Review comment:
       I also agree with that make it unmodifiable would be more clean and more error-prone. Thus I made it unmodifiable and copy in `PendingCheckpoint`. 
   
   I also make the variables inline.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r570164405



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBrief.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The brief of one checkpoint, indicating which tasks to trigger, waiting for acknowledge or commit
+ * for one specific checkpoint.
+ */
+public class CheckpointBrief {

Review comment:
       I think I would prefer more to `CheckpointPlan` since it is more easy to understand and do not have overlaps. I changed it to `CheckpointPlan`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565321454



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1924,7 +1918,8 @@ private void abortPendingCheckpoint(
                 }
             } finally {
                 sendAbortedMessages(
-                        pendingCheckpoint.getCheckpointId(),
+                        pendingCheckpoint.getRunningTasks(),
+                        pendingCheckpoint.getCheckpointID(),

Review comment:
       `getCheckpointId` maybe we should use the same method in one call.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r567793944



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1935,6 +1935,7 @@ private void abortPendingCheckpoint(
                 }
             } finally {
                 sendAbortedMessages(
+                        pendingCheckpoint.getTasksToCommitTo(),

Review comment:
       Why don't we need to send it to the `tasksToWaitFor`? Or is it that `tasksToCommitTo` is a strict superset of `tasksToWaitFor`?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ExecutionAttemptMappingProvider.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Responses to query the current attempt for the tasks and provide the mapping from the execution
+ * attempt id to its task.
+ */
+public class ExecutionAttemptMappingProvider {
+
+    private final List<ExecutionVertex> tasks;

Review comment:
       This works because the list of `ExecutionVertex` is currently static in the `ExecutionGraph` for the lifetime of the execution graph, I hope.
   
   It might make sense to turn this into an interface that `ExecutionGraph` implements to allow querying this information. It's just an idea, so please feel free to ignore for now.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -563,7 +554,7 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {
                                             createPendingCheckpoint(
                                                     timestamp,
                                                     request.props,
-                                                    ackTasks,
+                                                    checkpointBrief,

Review comment:
       I think both are good points, whatever we decide we could also change that in a later refactoring, right?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -350,13 +340,15 @@ public CheckpointCoordinator(
                         this.minPauseBetweenCheckpoints,
                         this.pendingCheckpoints::size,
                         this.checkpointsCleaner::getNumberOfCheckpointsToClean);
+
         this.cachedTasksById =
-                new LinkedHashMap<ExecutionAttemptID, ExecutionVertex>(tasksToWaitFor.length) {
+                new LinkedHashMap<ExecutionAttemptID, ExecutionVertex>(
+                        attemptMappingProvider.getNumberOfTasks()) {
 
                     @Override
                     protected boolean removeEldestEntry(
                             Map.Entry<ExecutionAttemptID, ExecutionVertex> eldest) {
-                        return size() > CheckpointCoordinator.this.tasksToWaitFor.length;
+                        return size() > attemptMappingProvider.getNumberOfTasks();

Review comment:
       Why is it the number of all tasks and not just the `tasksToWaitFor`? I have the feeling that the code before was not correct when the number of tasks in the different states can change, right?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -2138,12 +2080,10 @@ private void reportToStatsTracker(
         }
         Map<JobVertexID, Integer> vertices =
                 tasks.values().stream()
-                        .map(ExecutionVertex::getJobVertex)
-                        .distinct()
                         .collect(
-                                toMap(
-                                        ExecutionJobVertex::getJobVertexId,
-                                        ExecutionJobVertex::getParallelism));
+                                Collectors.groupingBy(
+                                        ExecutionVertex::getJobvertexId,
+                                        Collectors.reducing(0, e -> 1, Integer::sum)));

Review comment:
       I'm interested to know why this was necessary.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r566172768



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
##########
@@ -183,23 +166,27 @@ public CheckpointStatsSnapshot createSnapshot() {
     /**
      * Creates a new pending checkpoint tracker.
      *
+     * @param tasksToAck Tasks to acknowledge in this checkpoint.
      * @param checkpointId ID of the checkpoint.
      * @param triggerTimestamp Trigger timestamp of the checkpoint.
      * @param props The checkpoint properties.
      * @return Tracker for statistics gathering.
      */
     PendingCheckpointStats reportPendingCheckpoint(
-            long checkpointId, long triggerTimestamp, CheckpointProperties props) {
+            List<ExecutionVertex> tasksToAck,

Review comment:
       Should still be `tasksToWaitFor`

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -825,7 +811,7 @@ private void snapshotTaskState(
             long checkpointID,
             CheckpointStorageLocation checkpointStorageLocation,
             CheckpointProperties props,
-            Execution[] executions,
+            List<Execution> executions,

Review comment:
       Here, a renaming might be helpful. This should really be `tasksToTrigger`, right?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
##########
@@ -90,6 +90,8 @@
 
     private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;
 
+    private final List<ExecutionVertex> runningTasks;

Review comment:
       Consistent with my other comment, these should still be named `tasksToCommitTo` for now.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -730,7 +715,8 @@ private PendingCheckpoint createPendingCheckpoint(
 
         if (statsTracker != null) {
             PendingCheckpointStats callback =
-                    statsTracker.reportPendingCheckpoint(checkpointID, timestamp, props);
+                    statsTracker.reportPendingCheckpoint(

Review comment:
       Should we maybe use `tasksToWaitFor` here instead of `tasksToCommit`? I think in the future the two numbers might differ but `tasksToCommitTo` will strictly have to be a subset of `tasksToWaitFor` because it might be that not all participating tasks need the "notify-complete".

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBrief.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The brief of one checkpoint, indicating which tasks to trigger, waiting for acknowledge or commit
+ * for one specific checkpoint.
+ */
+public class CheckpointBrief {
+
+    /** Tasks who need to be sent a message when a checkpoint is started. */
+    private final List<Execution> tasksToTrigger;
+
+    /** Tasks who need to acknowledge a checkpoint before it succeeds. */
+    private final Map<ExecutionAttemptID, ExecutionVertex> tasksToAck;
+
+    /**
+     * Tasks that are still running when taking the checkpoint, these need to be sent a message when
+     * the checkpoint is confirmed.
+     */
+    private final List<ExecutionVertex> runningTasks;

Review comment:
       Agreed, I think we should keep `tasksToTrigger`, `tasksToWaitFor`, and `tasksToCommitTo` for now. We can change the names in a pure renaming commit after that if we think the names are not good anymore.
   
   (I think the names could be clearer so it's actually good to change them... 😅 )




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r567764140



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -563,7 +554,7 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {
                                             createPendingCheckpoint(
                                                     timestamp,
                                                     request.props,
-                                                    ackTasks,
+                                                    checkpointBrief,

Review comment:
       Very thanks @guoweiM  for the thoughts! For me I'm more tend to make function parameters more "specific" and only use the composite object if we relies on multiple properties of it. But I'm also open to other options. :-)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669",
       "triggerID" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738",
       "triggerID" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12740",
       "triggerID" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0c94f50f2a8e9a5753479fce0cb18e02a1931c16 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476) 
   * 39b84b60a20f67d813599bcea74ebbc28e9be573 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669) 
   * a3305f5a4c8c8e4d12b90130bdf72a43f5a96887 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738) 
   * 67aad0ca126a0e74cf8449451af85506aba43fbb Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12740) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565270255



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##########
@@ -448,14 +441,24 @@ public void failJobDueToTaskFailure(
                         new DispatcherThreadFactory(
                                 Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
 
+        List<ExecutionVertex> sourceVertices = new ArrayList<>();

Review comment:
       Maybe we could initialize logical to the `CheckpointBriefComputer`, which makes all the related logic together.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565277280



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBriefComputer.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Computes the tasks to trigger, wait or commit for each checkpoint. */
+public class CheckpointBriefComputer {
+    private static final Logger LOG = LoggerFactory.getLogger(CheckpointBriefComputer.class);
+
+    private final JobID jobId;
+
+    private final List<ExecutionVertex> tasksToTrigger;
+
+    private final List<ExecutionVertex> tasksToWait;
+
+    private final List<ExecutionVertex> tasksToCommit;
+
+    public CheckpointBriefComputer(
+            JobID jobId,
+            List<ExecutionVertex> tasksToTrigger,
+            List<ExecutionVertex> tasksToWait,
+            List<ExecutionVertex> tasksToCommit) {
+
+        this.jobId = jobId;
+        this.tasksToTrigger = Collections.unmodifiableList(tasksToTrigger);
+        this.tasksToWait = Collections.unmodifiableList(tasksToWait);
+        this.tasksToCommit = Collections.unmodifiableList(tasksToCommit);
+    }
+
+    public CheckpointBrief computeCheckpointBrief() throws CheckpointException {
+        List<Execution> tasksToTrigger = getTriggerExecutions();
+        Map<ExecutionAttemptID, ExecutionVertex> ackTasks = getAckTasks();
+
+        return new CheckpointBrief(tasksToTrigger, ackTasks, tasksToCommit);
+    }
+
+    private List<Execution> getTriggerExecutions() throws CheckpointException {

Review comment:
       The two methods(`getTriggerExecutions` & `getActTasks`) is from the `CheckpointCoordinator`.So maybe there could be comments just like the `getAckTasks` .  :-) 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669",
       "triggerID" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738",
       "triggerID" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12740",
       "triggerID" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12744",
       "triggerID" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "916cc586e1ba5a0ecdb3941d2706a4ead1816af5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "916cc586e1ba5a0ecdb3941d2706a4ead1816af5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 39b84b60a20f67d813599bcea74ebbc28e9be573 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669) 
   * 116191180919cc10e800d2bae10fc4449cd90b43 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12744) 
   * 916cc586e1ba5a0ecdb3941d2706a4ead1816af5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0c94f50f2a8e9a5753479fce0cb18e02a1931c16 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r570169618



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -350,13 +340,15 @@ public CheckpointCoordinator(
                         this.minPauseBetweenCheckpoints,
                         this.pendingCheckpoints::size,
                         this.checkpointsCleaner::getNumberOfCheckpointsToClean);
+
         this.cachedTasksById =
-                new LinkedHashMap<ExecutionAttemptID, ExecutionVertex>(tasksToWaitFor.length) {
+                new LinkedHashMap<ExecutionAttemptID, ExecutionVertex>(
+                        attemptMappingProvider.getNumberOfTasks()) {
 
                     @Override
                     protected boolean removeEldestEntry(
                             Map.Entry<ExecutionAttemptID, ExecutionVertex> eldest) {
-                        return size() > CheckpointCoordinator.this.tasksToWaitFor.length;
+                        return size() > attemptMappingProvider.getNumberOfTasks();

Review comment:
       I think it would be much more clear to move cache to `ExecutionAttemptMappingProvider`.  I then moved it to `ExecutionAttemptMappingProvider`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r570599733



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##########
@@ -464,7 +459,13 @@ public void failJobDueToTaskFailure(
                         checkpointsCleaner,
                         new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
                         SharedStateRegistry.DEFAULT_FACTORY,
-                        failureManager);
+                        failureManager,
+                        new CheckpointBriefCalculator(
+                                getJobID(),
+                                sourceAndAllVertices.f0,
+                                sourceAndAllVertices.f1,
+                                sourceAndAllVertices.f1),
+                        new ExecutionAttemptMappingProvider(sourceAndAllVertices.f1));

Review comment:
       I share your concern and I also like your proposal :+1: 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669",
       "triggerID" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738",
       "triggerID" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12740",
       "triggerID" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12744",
       "triggerID" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 39b84b60a20f67d813599bcea74ebbc28e9be573 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669) 
   * 116191180919cc10e800d2bae10fc4449cd90b43 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12744) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0c94f50f2a8e9a5753479fce0cb18e02a1931c16 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476) 
   * 39b84b60a20f67d813599bcea74ebbc28e9be573 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669",
       "triggerID" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738",
       "triggerID" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12740",
       "triggerID" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12744",
       "triggerID" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 39b84b60a20f67d813599bcea74ebbc28e9be573 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669) 
   * 67aad0ca126a0e74cf8449451af85506aba43fbb Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12740) 
   * 116191180919cc10e800d2bae10fc4449cd90b43 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12744) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r570174748



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##########
@@ -464,7 +459,13 @@ public void failJobDueToTaskFailure(
                         checkpointsCleaner,
                         new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
                         SharedStateRegistry.DEFAULT_FACTORY,
-                        failureManager);
+                        failureManager,
+                        new CheckpointBriefCalculator(
+                                getJobID(),
+                                sourceAndAllVertices.f0,
+                                sourceAndAllVertices.f1,
+                                sourceAndAllVertices.f1),
+                        new ExecutionAttemptMappingProvider(sourceAndAllVertices.f1));

Review comment:
       I also more like to move the creation to be inside `CheckpointPlanCalculator`. However, since currently the unit tests relies on passing mock vertices directly on creating `CheckpointCoordinator`, thus I think we may postpone the change to the next PR, in the second PR we would refactor the current tests to be based on real `ExecutionGraph` and then we could move the creation into inside `CheckpointPlanCalculator`.
   
   I'm a little concern for make `ExecutionAttemptMappingProvider` to be depend on `CheckpointPlanCalculator` since logically I think the two components should be for two separate purposes. Besides, we would also adjust the logic of the `CheckpointPlanCalculator` in the next PR. Do you think it would be also ok to separate the two components and make them all depends on ExecutionGraph~?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669",
       "triggerID" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738",
       "triggerID" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12740",
       "triggerID" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12744",
       "triggerID" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "916cc586e1ba5a0ecdb3941d2706a4ead1816af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12934",
       "triggerID" : "916cc586e1ba5a0ecdb3941d2706a4ead1816af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c1ecfa3e7a8bd5a6195009dcad6207881a15a077",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12941",
       "triggerID" : "c1ecfa3e7a8bd5a6195009dcad6207881a15a077",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 39b84b60a20f67d813599bcea74ebbc28e9be573 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669) 
   * c1ecfa3e7a8bd5a6195009dcad6207881a15a077 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12941) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669",
       "triggerID" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738",
       "triggerID" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12740",
       "triggerID" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12744",
       "triggerID" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "916cc586e1ba5a0ecdb3941d2706a4ead1816af5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12934",
       "triggerID" : "916cc586e1ba5a0ecdb3941d2706a4ead1816af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c1ecfa3e7a8bd5a6195009dcad6207881a15a077",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12941",
       "triggerID" : "c1ecfa3e7a8bd5a6195009dcad6207881a15a077",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 39b84b60a20f67d813599bcea74ebbc28e9be573 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669) 
   * 916cc586e1ba5a0ecdb3941d2706a4ead1816af5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12934) 
   * c1ecfa3e7a8bd5a6195009dcad6207881a15a077 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12941) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565270255



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##########
@@ -448,14 +441,24 @@ public void failJobDueToTaskFailure(
                         new DispatcherThreadFactory(
                                 Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
 
+        List<ExecutionVertex> sourceVertices = new ArrayList<>();

Review comment:
       Maybe we could move the following "initialize" logical to the `CheckpointBriefComputer`, which makes all the related logic together.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 721dd38bb286722cbb1a6925dbedbb3614930b20 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669",
       "triggerID" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738",
       "triggerID" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12740",
       "triggerID" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12744",
       "triggerID" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "916cc586e1ba5a0ecdb3941d2706a4ead1816af5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12934",
       "triggerID" : "916cc586e1ba5a0ecdb3941d2706a4ead1816af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c1ecfa3e7a8bd5a6195009dcad6207881a15a077",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12941",
       "triggerID" : "c1ecfa3e7a8bd5a6195009dcad6207881a15a077",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 39b84b60a20f67d813599bcea74ebbc28e9be573 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669) 
   * 116191180919cc10e800d2bae10fc4449cd90b43 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12744) 
   * 916cc586e1ba5a0ecdb3941d2706a4ead1816af5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12934) 
   * c1ecfa3e7a8bd5a6195009dcad6207881a15a077 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12941) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565315896



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBrief.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The brief of one checkpoint, indicating which tasks to trigger, waiting for acknowledge or commit
+ * for one specific checkpoint.
+ */
+public class CheckpointBrief {
+
+    /** Tasks who need to be sent a message when a checkpoint is started. */
+    private final List<Execution> tasksToTrigger;
+
+    /** Tasks who need to acknowledge a checkpoint before it succeeds. */
+    private final Map<ExecutionAttemptID, ExecutionVertex> tasksToAck;
+
+    /**
+     * Tasks that are still running when taking the checkpoint, these need to be sent a message when
+     * the checkpoint is confirmed.
+     */
+    private final List<ExecutionVertex> runningTasks;

Review comment:
       Maybe we should change the `runningTasks` to `tasksToCommitTo`. This pr is for refactoring so we could keep the concept consistent first. What do you think?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
aljoscha commented on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-769173204


   One more thing, I think `CheckpointBriefCalculator` might sound more natural than `CheckpointBriefComputer`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565277280



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBriefComputer.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Computes the tasks to trigger, wait or commit for each checkpoint. */
+public class CheckpointBriefComputer {
+    private static final Logger LOG = LoggerFactory.getLogger(CheckpointBriefComputer.class);
+
+    private final JobID jobId;
+
+    private final List<ExecutionVertex> tasksToTrigger;
+
+    private final List<ExecutionVertex> tasksToWait;
+
+    private final List<ExecutionVertex> tasksToCommit;
+
+    public CheckpointBriefComputer(
+            JobID jobId,
+            List<ExecutionVertex> tasksToTrigger,
+            List<ExecutionVertex> tasksToWait,
+            List<ExecutionVertex> tasksToCommit) {
+
+        this.jobId = jobId;
+        this.tasksToTrigger = Collections.unmodifiableList(tasksToTrigger);
+        this.tasksToWait = Collections.unmodifiableList(tasksToWait);
+        this.tasksToCommit = Collections.unmodifiableList(tasksToCommit);
+    }
+
+    public CheckpointBrief computeCheckpointBrief() throws CheckpointException {
+        List<Execution> tasksToTrigger = getTriggerExecutions();
+        Map<ExecutionAttemptID, ExecutionVertex> ackTasks = getAckTasks();
+
+        return new CheckpointBrief(tasksToTrigger, ackTasks, tasksToCommit);
+    }
+
+    private List<Execution> getTriggerExecutions() throws CheckpointException {

Review comment:
       The two methods(`getTriggerExecutions` & `getActTasks`) is from the `CheckpointCoordinator`.So maybe there could be comments just like the `getAckTasks` .    :)
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669",
       "triggerID" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738",
       "triggerID" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12740",
       "triggerID" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12744",
       "triggerID" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 39b84b60a20f67d813599bcea74ebbc28e9be573 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669) 
   * a3305f5a4c8c8e4d12b90130bdf72a43f5a96887 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738) 
   * 67aad0ca126a0e74cf8449451af85506aba43fbb Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12740) 
   * 116191180919cc10e800d2bae10fc4449cd90b43 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12744) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r567838949



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -2138,12 +2080,10 @@ private void reportToStatsTracker(
         }
         Map<JobVertexID, Integer> vertices =
                 tasks.values().stream()
-                        .map(ExecutionVertex::getJobVertex)
-                        .distinct()
                         .collect(
-                                toMap(
-                                        ExecutionJobVertex::getJobVertexId,
-                                        ExecutionJobVertex::getParallelism));
+                                Collectors.groupingBy(
+                                        ExecutionVertex::getJobvertexId,
+                                        Collectors.reducing(0, e -> 1, Integer::sum)));

Review comment:
       👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565321454



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1924,7 +1918,8 @@ private void abortPendingCheckpoint(
                 }
             } finally {
                 sendAbortedMessages(
-                        pendingCheckpoint.getCheckpointId(),
+                        pendingCheckpoint.getRunningTasks(),
+                        pendingCheckpoint.getCheckpointID(),

Review comment:
        Maybe we should use the same method in one call. `getCheckpointID`-> `getCheckpointID`  :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r566909909



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -730,7 +715,8 @@ private PendingCheckpoint createPendingCheckpoint(
 
         if (statsTracker != null) {
             PendingCheckpointStats callback =
-                    statsTracker.reportPendingCheckpoint(checkpointID, timestamp, props);
+                    statsTracker.reportPendingCheckpoint(

Review comment:
       Got it, I also agree with this. Since the master has been evolved to use the `tasksToWaitFor`, so I just kept the latest behavior in this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669",
       "triggerID" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738",
       "triggerID" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12740",
       "triggerID" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12744",
       "triggerID" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "916cc586e1ba5a0ecdb3941d2706a4ead1816af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12934",
       "triggerID" : "916cc586e1ba5a0ecdb3941d2706a4ead1816af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c1ecfa3e7a8bd5a6195009dcad6207881a15a077",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12941",
       "triggerID" : "c1ecfa3e7a8bd5a6195009dcad6207881a15a077",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e666fead74311e8b2d6d8359d3098a1e3f139234",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13000",
       "triggerID" : "e666fead74311e8b2d6d8359d3098a1e3f139234",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 39b84b60a20f67d813599bcea74ebbc28e9be573 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669) 
   * c1ecfa3e7a8bd5a6195009dcad6207881a15a077 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12941) 
   * e666fead74311e8b2d6d8359d3098a1e3f139234 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13000) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r567823503



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -563,7 +554,7 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {
                                             createPendingCheckpoint(
                                                     timestamp,
                                                     request.props,
-                                                    ackTasks,
+                                                    checkpointBrief,

Review comment:
       I'll also +1 for implement it in later refactoring~




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r567834139



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ExecutionAttemptMappingProvider.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Responses to query the current attempt for the tasks and provide the mapping from the execution
+ * attempt id to its task.
+ */
+public class ExecutionAttemptMappingProvider {
+
+    private final List<ExecutionVertex> tasks;

Review comment:
       I think the concern does make sense since we are considering reactive scheduling right now. I think we might first keep the static list in this PR since a lot of checkpoint related tests are relying on mocked execution vertex right now and we would change them to real `ExecutionGraph` in the next PR, otherwise we might need to also modify the tests in this PR. And if we consider dynamic  `ExecutionGraph` we might also need an end-to-end modification to not 
   depend on the number of tasks in initializing the cached mapping (e.g., in `removeEldestEntry()`). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r567528797



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBriefCalculator.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Computes the tasks to trigger, wait or commit for each checkpoint. */
+public class CheckpointBriefCalculator {
+    private static final Logger LOG = LoggerFactory.getLogger(CheckpointBriefCalculator.class);
+
+    private final JobID jobId;
+
+    private final List<ExecutionVertex> tasksToTrigger;
+
+    private final List<ExecutionVertex> tasksToWait;
+
+    private final List<ExecutionVertex> tasksToCommitTo;
+
+    public CheckpointBriefCalculator(
+            JobID jobId,
+            List<ExecutionVertex> tasksToTrigger,
+            List<ExecutionVertex> tasksToWait,
+            List<ExecutionVertex> tasksToCommitTo) {
+
+        this.jobId = jobId;
+        this.tasksToTrigger = Collections.unmodifiableList(tasksToTrigger);
+        this.tasksToWait = Collections.unmodifiableList(tasksToWait);
+        this.tasksToCommitTo = Collections.unmodifiableList(tasksToCommitTo);
+    }
+
+    public CheckpointBrief calculateCheckpointBrief() throws CheckpointException {
+        List<Execution> tasksToTrigger = getTriggerExecutions();
+        Map<ExecutionAttemptID, ExecutionVertex> tasksToWait = getAckTasks();
+
+        return new CheckpointBrief(
+                Collections.unmodifiableList(tasksToTrigger), tasksToWait, tasksToCommitTo);

Review comment:
       Maybe the `tasksToWait` could also be `unmodifialbeMap`

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -2122,7 +2065,7 @@ public boolean isForce() {
 
     private Optional<ExecutionVertex> getVertex(ExecutionAttemptID id) throws CheckpointException {

Review comment:
       no `CheckpointException` any more.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -563,7 +554,7 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {
                                             createPendingCheckpoint(
                                                     timestamp,
                                                     request.props,
-                                                    ackTasks,
+                                                    checkpointBrief,

Review comment:
       What I understand is that the `CheckpointBrief` describes how to “get” a `CompleteChekcpoint`. There are some methods related to“get”a CompleteCheckpoint, such as 
   “createPendingCheckpoint/reportToStateTracker/sendAbortedMessage/sendAcknowledgeMessages”.
   
   However, these methods sometime use `CheckpointBrief`, sometime uses `tasksTowaitFor/tasksToTriggerFor`, which are the internal state of the `CheckpointCoordinator`. After we refactor maybe all the methods could only use the `CheckpointBrief` as their parameter. (I think CheckpointId,TimeStamps should also be in the CheckpointBrief).

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1843,6 +1843,7 @@ int getNumQueuedRequests() {
 
     public void reportStats(long id, ExecutionAttemptID attemptId, CheckpointMetrics metrics)
             throws CheckpointException {
+

Review comment:
       maybe we could remove this line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669",
       "triggerID" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738",
       "triggerID" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0c94f50f2a8e9a5753479fce0cb18e02a1931c16 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476) 
   * 39b84b60a20f67d813599bcea74ebbc28e9be573 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669) 
   * a3305f5a4c8c8e4d12b90130bdf72a43f5a96887 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738) 
   * 67aad0ca126a0e74cf8449451af85506aba43fbb UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r566879860



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##########
@@ -448,14 +441,24 @@ public void failJobDueToTaskFailure(
                         new DispatcherThreadFactory(
                                 Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
 
+        List<ExecutionVertex> sourceVertices = new ArrayList<>();

Review comment:
       I'll put it in separate method first so that we do not need to modify too much unit tests for now. In the next PR I'll further move it into `CheckpointBriefCalculator`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##########
@@ -448,14 +441,24 @@ public void failJobDueToTaskFailure(
                         new DispatcherThreadFactory(
                                 Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
 
+        List<ExecutionVertex> sourceVertices = new ArrayList<>();

Review comment:
       I'll first put it in separate method first so that we do not need to modify too much unit tests for now. In the next PR I'll further move it into `CheckpointBriefCalculator`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r567823301



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -350,13 +340,15 @@ public CheckpointCoordinator(
                         this.minPauseBetweenCheckpoints,
                         this.pendingCheckpoints::size,
                         this.checkpointsCleaner::getNumberOfCheckpointsToClean);
+
         this.cachedTasksById =
-                new LinkedHashMap<ExecutionAttemptID, ExecutionVertex>(tasksToWaitFor.length) {
+                new LinkedHashMap<ExecutionAttemptID, ExecutionVertex>(
+                        attemptMappingProvider.getNumberOfTasks()) {
 
                     @Override
                     protected boolean removeEldestEntry(
                             Map.Entry<ExecutionAttemptID, ExecutionVertex> eldest) {
-                        return size() > CheckpointCoordinator.this.tasksToWaitFor.length;
+                        return size() > attemptMappingProvider.getNumberOfTasks();

Review comment:
       Yes, since the incomplete tasks' metric is reported after its checkpoint is declined, if new checkpoints occur and `tasksToWaitFor` changed between the first checkpoint get declined and the metrics are reported, some tasks might be missed from the cache.  Thus it would be better to always consider all the tasks. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r570169014



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
##########
@@ -142,6 +145,7 @@ public PendingCheckpoint(
         this.checkpointId = checkpointId;
         this.checkpointTimestamp = checkpointTimestamp;
         this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm);
+        this.tasksToCommitTo = checkNotNull(tasksToCommitTo);

Review comment:
       I also more tend to store `CheckpointPlan` to `PendingCheckpoint`. We also do it in the second PR since there would be multiple collections to pass. I then change it in this PR and store `CheckpointPlan` in `PendingCheckpoint`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on a change in pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #14734:
URL: https://github.com/apache/flink/pull/14734#discussion_r565315896



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBrief.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The brief of one checkpoint, indicating which tasks to trigger, waiting for acknowledge or commit
+ * for one specific checkpoint.
+ */
+public class CheckpointBrief {
+
+    /** Tasks who need to be sent a message when a checkpoint is started. */
+    private final List<Execution> tasksToTrigger;
+
+    /** Tasks who need to acknowledge a checkpoint before it succeeds. */
+    private final Map<ExecutionAttemptID, ExecutionVertex> tasksToAck;
+
+    /**
+     * Tasks that are still running when taking the checkpoint, these need to be sent a message when
+     * the checkpoint is confirmed.
+     */
+    private final List<ExecutionVertex> runningTasks;

Review comment:
       Maybe we could change the `runningTasks` to `tasksToCommitTo`. This pr is for refactoring so we could keep the concept consistent first.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 721dd38bb286722cbb1a6925dbedbb3614930b20 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410) 
   * 0c94f50f2a8e9a5753479fce0cb18e02a1931c16 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14734: [FLINK-21066][runtime][checkpoint] Refactor CheckpointCoordinator to compute tasks to trigger/ack/commit dynamically

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14734:
URL: https://github.com/apache/flink/pull/14734#issuecomment-766092219


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12410",
       "triggerID" : "721dd38bb286722cbb1a6925dbedbb3614930b20",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476",
       "triggerID" : "0c94f50f2a8e9a5753479fce0cb18e02a1931c16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669",
       "triggerID" : "39b84b60a20f67d813599bcea74ebbc28e9be573",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738",
       "triggerID" : "a3305f5a4c8c8e4d12b90130bdf72a43f5a96887",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12740",
       "triggerID" : "67aad0ca126a0e74cf8449451af85506aba43fbb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12744",
       "triggerID" : "116191180919cc10e800d2bae10fc4449cd90b43",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0c94f50f2a8e9a5753479fce0cb18e02a1931c16 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12476) 
   * 39b84b60a20f67d813599bcea74ebbc28e9be573 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12669) 
   * a3305f5a4c8c8e4d12b90130bdf72a43f5a96887 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12738) 
   * 67aad0ca126a0e74cf8449451af85506aba43fbb Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12740) 
   * 116191180919cc10e800d2bae10fc4449cd90b43 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12744) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org