You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/13 22:46:25 UTC

[GitHub] [flink] StephanEwen opened a new pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

StephanEwen opened a new pull request #12137:
URL: https://github.com/apache/flink/pull/12137


   ## What is the purpose of the change
   
   This Pull Request contains a series of changes that ultimately serve two goals:
   
     1. Complete checkpointing/restoring state for OperatorCoordinators (previously restores were not implemented)
     2. Only restore operator coordinators on after "global failures" or "global restore".
   
   The distinction between Global failures/restores and task failures/restores mean the following:
     - When a task fails, then that task and others (in the same failover region) are restarted with the latest checkpoint state. In that case, the Operator Coordinators only get notifications, the coordinator is not reset to the previous checkpoint. That is because some subtasks may potentially continue to run.
     - When a global failure happens (for example as part of a master failover, a change in JobManager leader status, or through the safety net in the scheduler) then all tasks are reset to the previous state, and likewise the coordinators are reset to the previous checkpoint. In those situations, there would be the danger inconistencies if the coordinator did not reset to the checkpoint.
     - A restore from a savepoint is likewise a global restore (it is a special case of a change in leader status)
   
   ## Brief change log
   
   Changes 42f3576, ad78321, 16c1524, 2a3fabc, 5e8dfd8, e6063a8 are simple refactoring and fixes of minor bugs. These commits should be self contained not too complex.
   
   11bc22f improves the failure handling by sending failure notifications to the `OperatorCoordinator` as soon as a task failed, and not only when it gets recovered. That makes failure handling in the coordinator faster.
   
   6452bd9 simply forwards the exception that caused the failure originally to the `OperatorCoordinator.subtaskFailed()` method.
   
   40f9067 Completes the remainders of the checkpointing integration. It fixes various previous issues and makes sure that `restore()` is always called on the `OperatorCoordinator` whenever a checkpoint is restored.
   
   a7e2a16 Introduces the two different methods in the `CheckpointCoordinator`:
     - `restoreLatestCheckpointedStateToAll(...)` for restores after global failures. This is the same as the previous behavior. This method also restores `OperatorCoordinators`.
     - `restoreLatestCheckpointedStateToSubtasks` for restores of (a subset of) subtasks only. The method is initially unused.
   The commit also removes the `failIfNoCheckpoint`flag which was only used during savepoint restores. Because savepoint restores have their dedicated method in the `CheckpointCoordinator` this flag was never used in the production code and was not re-included in the two new methods.
   
   2c5bd3b Changes the scheduler to call the two different methods introduces in the previous commits in the respective situations (task failure versus global failure).
   
   
   ## Verifying this change
   
   The changes here are covered by various additional unit tests, specifically in the class `OperatorCoordinatorSchedulerTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **no**
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **yes**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **no** (not a user-facing feature)
     - If yes, how is the feature documented? **not applicable**
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] becketqin commented on a change in pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

Posted by GitBox <gi...@apache.org>.
becketqin commented on a change in pull request #12137:
URL: https://github.com/apache/flink/pull/12137#discussion_r426157226



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
##########
@@ -186,10 +188,18 @@ private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState,
 
 	private void handleTaskFailure(final ExecutionVertexID executionVertexId, @Nullable final Throwable error) {
 		setGlobalFailureCause(error);
+		notifyCoordinatorsAboutTaskFailure(executionVertexId, error);

Review comment:
       Sounds good to me. Maybe we can just remove it in this PR. The code is here:
   https://github.com/apache/flink/blob/6f64213dbdfd0b1e75f503007dd026162881c9c7/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java#L252




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12137:
URL: https://github.com/apache/flink/pull/12137#issuecomment-628285539


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 2c5bd3b07c3bf5f8854fb02984869af2ee5c0c95 (Wed May 13 22:48:19 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-16357).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] asfgit closed pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #12137:
URL: https://github.com/apache/flink/pull/12137


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12137:
URL: https://github.com/apache/flink/pull/12137#issuecomment-628291485


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c5bd3b07c3bf5f8854fb02984869af2ee5c0c95",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1217",
       "triggerID" : "2c5bd3b07c3bf5f8854fb02984869af2ee5c0c95",
       "triggerType" : "PUSH"
     }, {
       "hash" : "282fa84594e24e0028e0e87d882c6cd656fcd18b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1342",
       "triggerID" : "282fa84594e24e0028e0e87d882c6cd656fcd18b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 282fa84594e24e0028e0e87d882c6cd656fcd18b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1342) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12137:
URL: https://github.com/apache/flink/pull/12137#discussion_r425754123



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
##########
@@ -122,7 +135,20 @@ public boolean canRestart() {
 	 * @return result of a set of tasks to restart to recover from the failure
 	 */
 	public static FailureHandlingResult restartable(Set<ExecutionVertexID> verticesToRestart, long restartDelayMS) {

Review comment:
       Minor: If we make `globalFailure` a param instead of adding new factory methods, seems the changes in both `FailureHandlingResult` can `ExecutionFailureHandler` can be much simpler. 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1127,16 +1127,44 @@ public boolean restoreLatestCheckpointedState(
 			boolean errorIfNoCheckpoint,
 			boolean allowNonRestoredState) throws Exception {
 
-		return restoreLatestCheckpointedState(new HashSet<>(tasks.values()), errorIfNoCheckpoint, allowNonRestoredState);
+		return restoreLatestCheckpointedStateInternal(new HashSet<>(tasks.values()), true, errorIfNoCheckpoint, allowNonRestoredState);
 	}
 
 	/**
-	 * Restores the latest checkpointed state.
+	 * Restores the latest checkpointed state to a set of subtasks. This method represents a "local"
+	 * or "regional" failover and does restore states to coordinators. Note that a regional failover
+	 * might still include all tasks.
+	 *
+	 * @param tasks Set of job vertices to restore. State for these vertices is
+	 * restored via {@link Execution#setInitialState(JobManagerTaskRestore)}.
+
+	 * @return <code>true</code> if state was restored, <code>false</code> otherwise.
+	 * @throws IllegalStateException If the CheckpointCoordinator is shut down.
+	 * @throws IllegalStateException If no completed checkpoint is available and
+	 *                               the <code>failIfNoCheckpoint</code> flag has been set.
+	 * @throws IllegalStateException If the checkpoint contains state that cannot be
+	 *                               mapped to any job vertex in <code>tasks</code> and the
+	 *                               <code>allowNonRestoredState</code> flag has not been set.
+	 * @throws IllegalStateException If the max parallelism changed for an operator
+	 *                               that restores state from this checkpoint.
+	 * @throws IllegalStateException If the parallelism changed for an operator
+	 *                               that restores <i>non-partitioned</i> state from this
+	 *                               checkpoint.
+	 */
+	public boolean restoreLatestCheckpointedStateToSubtasks(final Set<ExecutionJobVertex> tasks) throws Exception {
+		// when restoring subtasks only we accept potentially unmatched state because what we

Review comment:
       I think another reason why `allowNonRestoredState` must be true is that not all `JobVertex` are checked so that states of an absent `JobVertex` cannot get matched.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12137:
URL: https://github.com/apache/flink/pull/12137#issuecomment-628291485


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c5bd3b07c3bf5f8854fb02984869af2ee5c0c95",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1217",
       "triggerID" : "2c5bd3b07c3bf5f8854fb02984869af2ee5c0c95",
       "triggerType" : "PUSH"
     }, {
       "hash" : "282fa84594e24e0028e0e87d882c6cd656fcd18b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1342",
       "triggerID" : "282fa84594e24e0028e0e87d882c6cd656fcd18b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2c5bd3b07c3bf5f8854fb02984869af2ee5c0c95 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1217) 
   * 282fa84594e24e0028e0e87d882c6cd656fcd18b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1342) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12137:
URL: https://github.com/apache/flink/pull/12137#issuecomment-628291485


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c5bd3b07c3bf5f8854fb02984869af2ee5c0c95",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2c5bd3b07c3bf5f8854fb02984869af2ee5c0c95",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2c5bd3b07c3bf5f8854fb02984869af2ee5c0c95 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12137:
URL: https://github.com/apache/flink/pull/12137#issuecomment-628285539


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 282fa84594e24e0028e0e87d882c6cd656fcd18b (Fri Oct 16 10:33:22 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] StephanEwen commented on pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

Posted by GitBox <gi...@apache.org>.
StephanEwen commented on pull request #12137:
URL: https://github.com/apache/flink/pull/12137#issuecomment-628702948


   @zhuzhurk the final commit (2c5bd3b07c3bf5f8854fb02984869af2ee5c0c95) might be of interest to you, based on the discussion on Jira.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] StephanEwen commented on a change in pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

Posted by GitBox <gi...@apache.org>.
StephanEwen commented on a change in pull request #12137:
URL: https://github.com/apache/flink/pull/12137#discussion_r425829228



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1127,16 +1127,44 @@ public boolean restoreLatestCheckpointedState(
 			boolean errorIfNoCheckpoint,
 			boolean allowNonRestoredState) throws Exception {
 
-		return restoreLatestCheckpointedState(new HashSet<>(tasks.values()), errorIfNoCheckpoint, allowNonRestoredState);
+		return restoreLatestCheckpointedStateInternal(new HashSet<>(tasks.values()), true, errorIfNoCheckpoint, allowNonRestoredState);
 	}
 
 	/**
-	 * Restores the latest checkpointed state.
+	 * Restores the latest checkpointed state to a set of subtasks. This method represents a "local"
+	 * or "regional" failover and does restore states to coordinators. Note that a regional failover
+	 * might still include all tasks.
+	 *
+	 * @param tasks Set of job vertices to restore. State for these vertices is
+	 * restored via {@link Execution#setInitialState(JobManagerTaskRestore)}.
+
+	 * @return <code>true</code> if state was restored, <code>false</code> otherwise.
+	 * @throws IllegalStateException If the CheckpointCoordinator is shut down.
+	 * @throws IllegalStateException If no completed checkpoint is available and
+	 *                               the <code>failIfNoCheckpoint</code> flag has been set.
+	 * @throws IllegalStateException If the checkpoint contains state that cannot be
+	 *                               mapped to any job vertex in <code>tasks</code> and the
+	 *                               <code>allowNonRestoredState</code> flag has not been set.
+	 * @throws IllegalStateException If the max parallelism changed for an operator
+	 *                               that restores state from this checkpoint.
+	 * @throws IllegalStateException If the parallelism changed for an operator
+	 *                               that restores <i>non-partitioned</i> state from this
+	 *                               checkpoint.
+	 */
+	public boolean restoreLatestCheckpointedStateToSubtasks(final Set<ExecutionJobVertex> tasks) throws Exception {
+		// when restoring subtasks only we accept potentially unmatched state because what we

Review comment:
       Thanks, will add this to the comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12137:
URL: https://github.com/apache/flink/pull/12137#issuecomment-628291485


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c5bd3b07c3bf5f8854fb02984869af2ee5c0c95",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1217",
       "triggerID" : "2c5bd3b07c3bf5f8854fb02984869af2ee5c0c95",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2c5bd3b07c3bf5f8854fb02984869af2ee5c0c95 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1217) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12137:
URL: https://github.com/apache/flink/pull/12137#issuecomment-628291485


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c5bd3b07c3bf5f8854fb02984869af2ee5c0c95",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1217",
       "triggerID" : "2c5bd3b07c3bf5f8854fb02984869af2ee5c0c95",
       "triggerType" : "PUSH"
     }, {
       "hash" : "282fa84594e24e0028e0e87d882c6cd656fcd18b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "282fa84594e24e0028e0e87d882c6cd656fcd18b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2c5bd3b07c3bf5f8854fb02984869af2ee5c0c95 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1217) 
   * 282fa84594e24e0028e0e87d882c6cd656fcd18b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] StephanEwen commented on a change in pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

Posted by GitBox <gi...@apache.org>.
StephanEwen commented on a change in pull request #12137:
URL: https://github.com/apache/flink/pull/12137#discussion_r425829755



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
##########
@@ -122,7 +135,20 @@ public boolean canRestart() {
 	 * @return result of a set of tasks to restart to recover from the failure
 	 */
 	public static FailureHandlingResult restartable(Set<ExecutionVertexID> verticesToRestart, long restartDelayMS) {

Review comment:
       Happy to do that, it is simpler indeed. Originally, I wanted to preserve the "spirit of the class", with its descriptive factory methods. But I think you are right, the simplicity argument here is stronger.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] StephanEwen commented on a change in pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

Posted by GitBox <gi...@apache.org>.
StephanEwen commented on a change in pull request #12137:
URL: https://github.com/apache/flink/pull/12137#discussion_r426145923



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
##########
@@ -487,6 +488,15 @@ public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis
 		}
 	}
 
+	public static ByteStreamStateHandle deserializeAndCheckByteStreamStateHandle(DataInputStream dis) throws IOException {

Review comment:
       True, will add that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] StephanEwen commented on a change in pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

Posted by GitBox <gi...@apache.org>.
StephanEwen commented on a change in pull request #12137:
URL: https://github.com/apache/flink/pull/12137#discussion_r426146031



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
##########
@@ -468,6 +468,7 @@ public static void serializeStreamStateHandle(
 		dos.flush();
 	}
 
+	@Nullable

Review comment:
       Returning `Optional` here would mean reworking the method. I usually try to keep the method unless they are in really bad shape.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12137:
URL: https://github.com/apache/flink/pull/12137#issuecomment-628291485


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c5bd3b07c3bf5f8854fb02984869af2ee5c0c95",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1217",
       "triggerID" : "2c5bd3b07c3bf5f8854fb02984869af2ee5c0c95",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2c5bd3b07c3bf5f8854fb02984869af2ee5c0c95 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1217) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] StephanEwen commented on a change in pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

Posted by GitBox <gi...@apache.org>.
StephanEwen commented on a change in pull request #12137:
URL: https://github.com/apache/flink/pull/12137#discussion_r426152842



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
##########
@@ -186,10 +188,18 @@ private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState,
 
 	private void handleTaskFailure(final ExecutionVertexID executionVertexId, @Nullable final Throwable error) {
 		setGlobalFailureCause(error);
+		notifyCoordinatorsAboutTaskFailure(executionVertexId, error);

Review comment:
       I think we may need to remove that check anyways. It is quite possible that tasks fail immediately after they were deployed, before they could register.
   
   The `OperatorCoordinator` interface also has no notion of "registering", so it cannot decide which notifications to forward and which not. I'd rather err on the "forward too many" side, because missing a notification easily leads to a stall or data loss (split not added back, result is missing).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] becketqin commented on a change in pull request #12137: [FLINK-16177][FLINK-16357][checkpointing] Complete / improve checkpointing for OperatorCoordinators

Posted by GitBox <gi...@apache.org>.
becketqin commented on a change in pull request #12137:
URL: https://github.com/apache/flink/pull/12137#discussion_r426112436



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
##########
@@ -468,6 +468,7 @@ public static void serializeStreamStateHandle(
 		dos.flush();
 	}
 
+	@Nullable

Review comment:
       Minor: another option is to return an `Optional`. Currently we have both used in mix. It might be good to have a convention.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
##########
@@ -487,6 +488,15 @@ public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis
 		}
 	}
 
+	public static ByteStreamStateHandle deserializeAndCheckByteStreamStateHandle(DataInputStream dis) throws IOException {

Review comment:
       Nit: It seems the return value is also `Nullable`?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
##########
@@ -186,10 +188,18 @@ private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState,
 
 	private void handleTaskFailure(final ExecutionVertexID executionVertexId, @Nullable final Throwable error) {
 		setGlobalFailureCause(error);
+		notifyCoordinatorsAboutTaskFailure(executionVertexId, error);

Review comment:
       It looks that the failover may initially restart tasks only and fall back to a global failover if the task-only failover encountered exception. This may cause the `OperatorCoordinator.subtaskFailed()` to be invoked twice consecutively on the same subtask. Currently `SourceCoordinator` throws an exception if `subtaskFailed()` is invoked on a subtask that has not been registered. We may need to remove that check.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org