You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/05 08:01:05 UTC

[GitHub] [flink] becketqin commented on a change in pull request #13044: [FLINK-18641][checkpointing] Fix CheckpointCoordinator to work with ExternallyInducedSource

becketqin commented on a change in pull request #13044:
URL: https://github.com/apache/flink/pull/13044#discussion_r465535249



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -530,15 +530,22 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {
 							request.getOnCompletionFuture()),
 						timer);
 
-			final CompletableFuture<?> masterStatesComplete = pendingCheckpointCompletableFuture
-					.thenCompose(this::snapshotMasterState);
-
 			final CompletableFuture<?> coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
 					.thenComposeAsync((pendingCheckpoint) ->
 							OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
 									coordinatorsToCheckpoint, pendingCheckpoint, timer),
 							timer);
 
+			// We have to take the snapshot of the master hooks after the coordinator checkpoints has completed.
+			// This is to ensure the tasks are checkpointed after the OperatorCoordinators in case
+			// ExternallyInducedSource is used.
+			final CompletableFuture<?> masterStatesComplete = coordinatorCheckpointsComplete
+					.thenComposeAsync(ignored -> {
+						PendingCheckpoint checkpoint =
+							FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);

Review comment:
       The exceptions are handled in line 552 centrally.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2260,6 +2263,121 @@ public void testSavepointScheduledInUnalignedMode() throws Exception {
 		}
 	}
 
+	/**
+	 * Test that the checkpoint still behave correctly when the task checkpoint is triggered by the
+	 * master hooks and finished before the master checkpoint.
+	 */

Review comment:
       The test covers the checkpoint failure issue. The other case is sort of indirectly tested by other tests which ensures all the `OperatorCoordinators` have finished snapshots before moving on to the next step. We can add another test to explicitly test the ordering issue.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -530,15 +530,22 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {
 							request.getOnCompletionFuture()),
 						timer);
 
-			final CompletableFuture<?> masterStatesComplete = pendingCheckpointCompletableFuture
-					.thenCompose(this::snapshotMasterState);
-
 			final CompletableFuture<?> coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
 					.thenComposeAsync((pendingCheckpoint) ->
 							OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
 									coordinatorsToCheckpoint, pendingCheckpoint, timer),
 							timer);
 
+			// We have to take the snapshot of the master hooks after the coordinator checkpoints has completed.
+			// This is to ensure the tasks are checkpointed after the OperatorCoordinators in case
+			// ExternallyInducedSource is used.
+			final CompletableFuture<?> masterStatesComplete = coordinatorCheckpointsComplete
+					.thenComposeAsync(ignored -> {

Review comment:
       It is true. The reason I changed that to `thenComposeAsync` is primarily for safety. While it is quite clear that the code _getting_ this `coordinatorCheckpointComplete` executes in the `timer`, the code that _completes_ `coordinatorCheckpointsComplete` needs a few clicks to see it is done in the `timer` as well.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -576,7 +583,22 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {
 										request.advanceToEndOfTime);
 
 									coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
-
+									// It is possible that the tasks has finished checkpointing at this point.
+									// So we need to complete this pending checkpoint.

Review comment:
       This is not only about the threading model in the JM itself. Even if there is a single thread in the JM, when the master hooks trigger external system to start the checkpoint, the tasks will still send CP acknowledges to the JM, those acks will be enqueued into the JM thread task queue, but might still before the master state snapshot completes. In this case, the single thread model won't solve the problem.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org