You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/09/09 01:17:12 UTC

[flink] 04/06: [FLINK-18641][runtime/checkpointing] Checkpoint the operator coordinators before triggering the master hooks.

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c06e2c8245a945c31024d673da9278160df20ad6
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Mon Aug 17 13:55:25 2020 +0800

    [FLINK-18641][runtime/checkpointing] Checkpoint the operator coordinators before triggering the master hooks.
    
    We have to take the snapshot of the master hooks after the coordinator checkpoints has completed.
    This is to ensure the tasks are checkpointed after the OperatorCoordinators in case
    ExternallyInducedSource is used.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |  16 ++-
 .../checkpoint/CheckpointCoordinatorTest.java      |  34 +++++--
 .../CheckpointCoordinatorTestingUtils.java         | 111 ++++++++++++++++++++-
 3 files changed, 150 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 4070841..fab27a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -528,15 +528,25 @@ public class CheckpointCoordinator {
 							request.getOnCompletionFuture()),
 						timer);
 
-			final CompletableFuture<?> masterStatesComplete = pendingCheckpointCompletableFuture
-					.thenCompose(this::snapshotMasterState);
-
 			final CompletableFuture<?> coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
 					.thenComposeAsync((pendingCheckpoint) ->
 							OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
 									coordinatorsToCheckpoint, pendingCheckpoint, timer),
 							timer);
 
+			// We have to take the snapshot of the master hooks after the coordinator checkpoints has completed.
+			// This is to ensure the tasks are checkpointed after the OperatorCoordinators in case
+			// ExternallyInducedSource is used.
+			final CompletableFuture<?> masterStatesComplete = coordinatorCheckpointsComplete
+				.thenCompose(ignored -> {
+					// If the code reaches here, the pending checkpoint is guaranteed to be not null.
+					// We use FutureUtils.getWithoutException() to make compiler happy with checked
+					// exceptions in the signature.
+					PendingCheckpoint checkpoint =
+						FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
+					return snapshotMasterState(checkpoint);
+				});
+
 			FutureUtils.assertNoException(
 				CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)
 					.handleAsync(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index eef540c..16d7136 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -86,6 +86,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -2309,10 +2310,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 	/**
 	 * Test that the checkpoint still behave correctly when the task checkpoint is triggered by the
-	 * master hooks and finished before the master checkpoint.
+	 * master hooks and finished before the master checkpoint. Also make sure that the operator
+	 * coordinators are checkpointed before starting the task checkpoint.
 	 */
 	@Test
-	public void testTaskCheckpointTriggeredByMasterHooks() throws Exception {
+	public void testExternallyInducedSourceWithOperatorCoordinator() throws Exception {
 		final JobID jobId = new JobID();
 
 		// create some mock Execution vertices that receive the checkpoint trigger messages
@@ -2323,10 +2325,6 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		ExecutionVertex vertex2 = mockExecutionVertex(attemptID2,
 			(executionAttemptID, jid, checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
 
-		// set up the coordinator and validate the initial state
-		CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobId, vertex1, vertex2);
-		AtomicReference<Long> checkpointIdRef = new AtomicReference<>();
-
 		OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
 		OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
 		TaskStateSnapshot taskOperatorSubtaskStates1 = new TaskStateSnapshot();
@@ -2336,6 +2334,27 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
 		taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID2, subtaskState2);
 
+		// Create a mock OperatorCoordinatorCheckpointContext which completes the checkpoint immediately.
+		AtomicBoolean coordCheckpointDone = new AtomicBoolean(false);
+		OperatorCoordinatorCheckpointContext coordinatorCheckpointContext =
+			new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder()
+				.setOnCallingCheckpointCoordinator((checkpointId, result) -> {
+					coordCheckpointDone.set(true);
+					result.complete(new byte[0]);
+				})
+				.setOperatorID(opID1)
+				.build();
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorBuilder()
+			.setJobId(jobId)
+			.setTasks(new ExecutionVertex[]{ vertex1, vertex2 })
+			.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build())
+			.setTimer(manuallyTriggeredScheduledExecutor)
+			.setCoordinatorsToCheckpoint(Collections.singleton(coordinatorCheckpointContext))
+			.build();
+		AtomicReference<Long> checkpointIdRef = new AtomicReference<>();
+
 		// Add a master hook which triggers and acks the task checkpoint immediately.
 		// In this case the task checkpoints would complete before the job master checkpoint completes.
 		checkpointCoordinator.addMasterHook(new MasterTriggerRestoreHook<Integer>() {
@@ -2347,6 +2366,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			@Override
 			@Nullable
 			public CompletableFuture<Integer> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
+				assertTrue("The coordinator checkpoint should have finished.", coordCheckpointDone.get());
 				// Acknowledge the checkpoint in the master hooks so the task snapshots complete before
 				// the master state snapshot completes.
 				checkpointIdRef.set(checkpointId);
@@ -2393,7 +2413,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		// trigger the first checkpoint. this should succeed
 		final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
-		assertFalse(checkpointFuture.isCompletedExceptionally());
+		FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
 		// now we should have a completed checkpoint
 		assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index 632a84a..df1395f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -74,7 +74,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -652,8 +655,10 @@ public class CheckpointCoordinatorTestingUtils {
 			return this;
 		}
 
-		public void setCoordinatorsToCheckpoint(Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint) {
+		public CheckpointCoordinatorBuilder setCoordinatorsToCheckpoint(
+				Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint) {
 			this.coordinatorsToCheckpoint = coordinatorsToCheckpoint;
+			return this;
 		}
 
 		public CheckpointCoordinatorBuilder setCheckpointIDCounter(
@@ -695,6 +700,11 @@ public class CheckpointCoordinatorTestingUtils {
 			return this;
 		}
 
+		public CheckpointCoordinatorBuilder setStateBackEnd(StateBackend stateBackEnd) {
+			this.checkpointStateBackend = stateBackEnd;
+			return this;
+		}
+
 		public CheckpointCoordinator build() {
 			return new CheckpointCoordinator(
 				jobId,
@@ -738,4 +748,103 @@ public class CheckpointCoordinatorTestingUtils {
 			return new String(serialized, StandardCharsets.UTF_8);
 		}
 	}
+
+	// ----------------- Mock class builders ---------------
+
+	public static final class MockOperatorCheckpointCoordinatorContextBuilder {
+		private BiConsumer<Long, CompletableFuture<byte[]>> onCallingCheckpointCoordinator = null;
+		private Consumer<Long> onCallingAfterSourceBarrierInjection = null;
+		private OperatorID operatorID = null;
+
+		public MockOperatorCheckpointCoordinatorContextBuilder setOnCallingCheckpointCoordinator(
+				BiConsumer<Long, CompletableFuture<byte[]>> onCallingCheckpointCoordinator) {
+			this.onCallingCheckpointCoordinator = onCallingCheckpointCoordinator;
+			return this;
+		}
+
+		public MockOperatorCheckpointCoordinatorContextBuilder setOnCallingAfterSourceBarrierInjection(
+				Consumer<Long> onCallingAfterSourceBarrierInjection) {
+			this.onCallingAfterSourceBarrierInjection = onCallingAfterSourceBarrierInjection;
+			return this;
+		}
+
+		public MockOperatorCheckpointCoordinatorContextBuilder setOperatorID(OperatorID operatorID) {
+			this.operatorID = operatorID;
+			return this;
+		}
+
+		public MockOperatorCoordinatorCheckpointContext build() {
+			return new MockOperatorCoordinatorCheckpointContext(
+				onCallingCheckpointCoordinator,
+				onCallingAfterSourceBarrierInjection,
+				operatorID);
+		}
+	}
+
+	// ----------------- Mock classes --------------------
+
+	/**
+	 * The class works together with {@link MockOperatorCheckpointCoordinatorContextBuilder} to
+	 * construct a mock OperatorCoordinatorCheckpointContext.
+	 */
+	public static final class MockOperatorCoordinatorCheckpointContext implements OperatorCoordinatorCheckpointContext {
+		private final BiConsumer<Long, CompletableFuture<byte[]>> onCallingCheckpointCoordinator;
+		private final Consumer<Long> onCallingAfterSourceBarrierInjection;
+		private final OperatorID operatorID;
+
+		private MockOperatorCoordinatorCheckpointContext(
+				BiConsumer<Long, CompletableFuture<byte[]>> onCallingCheckpointCoordinator,
+				Consumer<Long> onCallingAfterSourceBarrierInjection,
+				OperatorID operatorID) {
+			 this.onCallingCheckpointCoordinator = onCallingCheckpointCoordinator;
+			 this.onCallingAfterSourceBarrierInjection = onCallingAfterSourceBarrierInjection;
+			 this.operatorID = operatorID;
+		}
+
+		@Override
+		public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
+			if (onCallingCheckpointCoordinator != null) {
+				onCallingCheckpointCoordinator.accept(checkpointId, result);
+			}
+		}
+
+		@Override
+		public void afterSourceBarrierInjection(long checkpointId) {
+			if (onCallingAfterSourceBarrierInjection != null) {
+				onCallingAfterSourceBarrierInjection.accept(checkpointId);
+			}
+		}
+
+		@Override
+		public void abortCurrentTriggering() {
+
+		}
+
+		@Override
+		public void checkpointComplete(long checkpointId) {
+
+		}
+
+		@Override
+		public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+
+		}
+
+		@Override
+		public OperatorID operatorId() {
+			return operatorID;
+		}
+
+		@Override
+		public int maxParallelism() {
+			return 1;
+		}
+
+		@Override
+		public int currentParallelism() {
+			return 1;
+		}
+	}
+
+
 }