You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/09/09 01:17:11 UTC

[flink] 03/06: [FLINK-18641][checkpointing] Fix CheckpointCoordinator to work with ExternallyInducedSource.

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8a59feadc65bdc79ca955d5de89a10b6e8453997
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Wed Aug 5 20:37:07 2020 +0800

    [FLINK-18641][checkpointing] Fix CheckpointCoordinator to work with ExternallyInducedSource.
    
    This patch fixes the CheckpointCoordinator to make it work with
    ExternallyInducedSource in cases that the task snapshots are triggered
    by the external systems via master hooks, rather than the checkpoint
    coordinator.
    
    The problem in the current code is that when the task snapshots are
    triggered externally via the master hooks, the checkpoint coordinator
    may receive all the acks from the tasks before the master state snapshot
    completes. And this leads to checkpoint failure. The fix is to only
    finalize the checkpoint when all of the operator coordinator checkpoint,
    master snapshots and task snapshots are fully taken.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |  29 +++++-
 .../checkpoint/CheckpointCoordinatorTest.java      | 112 +++++++++++++++++++++
 2 files changed, 138 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index d7b5cee..4070841 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -574,11 +574,14 @@ public class CheckpointCoordinator {
 										request.advanceToEndOfTime);
 
 									coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
-
+									// It is possible that the tasks has finished checkpointing at this point.
+									// So we need to complete this pending checkpoint.
+									if (!maybeCompleteCheckpoint(checkpoint)) {
+										return null;
+									}
 									onTriggerSuccess();
 								}
 							}
-
 							return null;
 						},
 						timer)
@@ -845,6 +848,26 @@ public class CheckpointCoordinator {
 		}
 	}
 
+	// Returns true if the checkpoint is successfully completed, false otherwise.
+	private boolean maybeCompleteCheckpoint(PendingCheckpoint checkpoint) {
+		synchronized (lock) {
+			if (checkpoint.isFullyAcknowledged()) {
+				try {
+					// we need to check inside the lock for being shutdown as well,
+					// otherwise we get races and invalid error log messages.
+					if (shutdown) {
+						return false;
+					}
+					completePendingCheckpoint(checkpoint);
+				} catch (CheckpointException ce) {
+					onTriggerFailure(checkpoint, ce);
+					return false;
+				}
+			}
+		}
+		return true;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Handling checkpoints and messages
 	// --------------------------------------------------------------------------------------------
@@ -954,7 +977,7 @@ public class CheckpointCoordinator {
 						LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {} at {}.",
 							checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
 
-						if (checkpoint.areTasksFullyAcknowledged()) {
+						if (checkpoint.isFullyAcknowledged()) {
 							completePendingCheckpoint(checkpoint);
 						}
 						break;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 3207164..eef540c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
@@ -63,6 +64,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.verification.VerificationMode;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -79,6 +81,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -2304,6 +2307,115 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Test that the checkpoint still behave correctly when the task checkpoint is triggered by the
+	 * master hooks and finished before the master checkpoint.
+	 */
+	@Test
+	public void testTaskCheckpointTriggeredByMasterHooks() throws Exception {
+		final JobID jobId = new JobID();
+
+		// create some mock Execution vertices that receive the checkpoint trigger messages
+		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+		final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+		ExecutionVertex vertex1 = mockExecutionVertex(attemptID1,
+			(executionAttemptID, jid, checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+		ExecutionVertex vertex2 = mockExecutionVertex(attemptID2,
+			(executionAttemptID, jid, checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(jobId, vertex1, vertex2);
+		AtomicReference<Long> checkpointIdRef = new AtomicReference<>();
+
+		OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
+		OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
+		TaskStateSnapshot taskOperatorSubtaskStates1 = new TaskStateSnapshot();
+		TaskStateSnapshot taskOperatorSubtaskStates2 = new TaskStateSnapshot();
+		OperatorSubtaskState subtaskState1 = new OperatorSubtaskState();
+		OperatorSubtaskState subtaskState2 = new OperatorSubtaskState();
+		taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
+		taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID2, subtaskState2);
+
+		// Add a master hook which triggers and acks the task checkpoint immediately.
+		// In this case the task checkpoints would complete before the job master checkpoint completes.
+		checkpointCoordinator.addMasterHook(new MasterTriggerRestoreHook<Integer>() {
+			@Override
+			public String getIdentifier() {
+				return "anything";
+			}
+
+			@Override
+			@Nullable
+			public CompletableFuture<Integer> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
+				// Acknowledge the checkpoint in the master hooks so the task snapshots complete before
+				// the master state snapshot completes.
+				checkpointIdRef.set(checkpointId);
+				AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(
+					jobId, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1);
+				AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(
+					jobId, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
+				checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
+				checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
+				return null;
+			}
+
+			@Override
+			public void restoreCheckpoint(long checkpointId, Integer checkpointData) throws Exception {
+
+			}
+
+			@Override
+			public SimpleVersionedSerializer<Integer> createCheckpointDataSerializer() {
+				return new SimpleVersionedSerializer<Integer>() {
+					@Override
+					public int getVersion() {
+						return 0;
+					}
+
+					@Override
+					public byte[] serialize(Integer obj) throws IOException {
+						return new byte[0];
+					}
+
+					@Override
+					public Integer deserialize(int version, byte[] serialized) throws IOException {
+						return 1;
+					}
+				};
+			}
+		});
+
+		// Verify initial state.
+		assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+		assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(0, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
+
+		// trigger the first checkpoint. this should succeed
+		final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
+		manuallyTriggeredScheduledExecutor.triggerAll();
+		assertFalse(checkpointFuture.isCompletedExceptionally());
+
+		// now we should have a completed checkpoint
+		assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+
+		// the canceler should be removed now
+		assertEquals(0, manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
+
+		// validate that the relevant tasks got a confirmation message
+		long checkpointId = checkpointIdRef.get();
+		verify(vertex1.getCurrentExecutionAttempt(),
+			times(1)).triggerCheckpoint(eq(checkpointId), any(Long.class), any(CheckpointOptions.class));
+		verify(vertex2.getCurrentExecutionAttempt(),
+			times(1)).triggerCheckpoint(eq(checkpointId), any(Long.class), any(CheckpointOptions.class));
+
+		CompletedCheckpoint success = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
+		assertEquals(jobId, success.getJobId());
+		assertEquals(2, success.getOperatorStates().size());
+
+		checkpointCoordinator.shutdown(JobStatus.FINISHED);
+	}
+
 	private CheckpointCoordinator getCheckpointCoordinator(
 		JobID jobId,
 		ExecutionVertex vertex1,