You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/09/09 01:17:13 UTC

[flink] 05/06: [hotfix] Make it more clear that the master hooks are also fired in the checkpoint timer thread.

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bdd2ca151db0a7a2f4f1d675859361171e4001ab
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Mon Aug 17 13:58:24 2020 +0800

    [hotfix] Make it more clear that the master hooks are also fired in the checkpoint timer thread.
---
 .../org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index fab27a1..26cf391 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -538,14 +538,14 @@ public class CheckpointCoordinator {
 			// This is to ensure the tasks are checkpointed after the OperatorCoordinators in case
 			// ExternallyInducedSource is used.
 			final CompletableFuture<?> masterStatesComplete = coordinatorCheckpointsComplete
-				.thenCompose(ignored -> {
+				.thenComposeAsync(ignored -> {
 					// If the code reaches here, the pending checkpoint is guaranteed to be not null.
 					// We use FutureUtils.getWithoutException() to make compiler happy with checked
 					// exceptions in the signature.
 					PendingCheckpoint checkpoint =
 						FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
 					return snapshotMasterState(checkpoint);
-				});
+				}, timer);
 
 			FutureUtils.assertNoException(
 				CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)