You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/09/09 01:17:14 UTC

[flink] 06/06: [hotfix] Add unit test for checkpoint failure.

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7286c662af63626914e0e532203288db615f67c3
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Mon Sep 7 22:47:55 2020 +0800

    [hotfix] Add unit test for checkpoint failure.
---
 .../checkpoint/CheckpointCoordinatorTest.java      | 126 +++++++++++++++++++++
 1 file changed, 126 insertions(+)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 16d7136..78e5336 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -38,6 +38,9 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -50,6 +53,9 @@ import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
 import org.apache.flink.util.ExceptionUtils;
@@ -2436,6 +2442,126 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		checkpointCoordinator.shutdown(JobStatus.FINISHED);
 	}
 
+	@Test
+	public void testCompleteCheckpointFailureWithExternallyInducedSource() throws Exception {
+		final JobID jobId = new JobID();
+
+		// create some mock Execution vertices that receive the checkpoint trigger messages
+		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+		final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+		ExecutionVertex vertex1 = mockExecutionVertex(attemptID1,
+			(executionAttemptID, jid, checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+		ExecutionVertex vertex2 = mockExecutionVertex(attemptID2,
+			(executionAttemptID, jid, checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+
+		OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
+		OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
+		TaskStateSnapshot taskOperatorSubtaskStates1 = new TaskStateSnapshot();
+		TaskStateSnapshot taskOperatorSubtaskStates2 = new TaskStateSnapshot();
+		OperatorSubtaskState subtaskState1 = new OperatorSubtaskState();
+		OperatorSubtaskState subtaskState2 = new OperatorSubtaskState();
+		taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
+		taskOperatorSubtaskStates2.putSubtaskStateByOperatorID(opID2, subtaskState2);
+
+		// Create a mock OperatorCoordinatorCheckpointContext which completes the checkpoint immediately.
+		AtomicBoolean coordCheckpointDone = new AtomicBoolean(false);
+		OperatorCoordinatorCheckpointContext coordinatorCheckpointContext =
+			new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder()
+				.setOnCallingCheckpointCoordinator((checkpointId, result) -> {
+					coordCheckpointDone.set(true);
+					result.complete(new byte[0]);
+				})
+				.setOperatorID(opID1)
+				.build();
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorBuilder()
+			.setJobId(jobId)
+			.setTasks(new ExecutionVertex[]{ vertex1, vertex2 })
+			.setCheckpointCoordinatorConfiguration(
+				CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build())
+			.setTimer(manuallyTriggeredScheduledExecutor)
+			.setCoordinatorsToCheckpoint(Collections.singleton(coordinatorCheckpointContext))
+			.setStateBackEnd(new MemoryStateBackend() {
+				private static final long serialVersionUID = 8134582566514272546L;
+
+				// Throw exception when finalizing the checkpoint.
+				@Override
+				public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
+					return new MemoryBackendCheckpointStorage(jobId, null, null, 100) {
+						@Override
+						public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
+							return new NonPersistentMetadataCheckpointStorageLocation(1000) {
+								@Override
+								public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException {
+									throw new IOException("Artificial Exception");
+								}
+							};
+						}
+					};
+				}
+			})
+			.build();
+		AtomicReference<Long> checkpointIdRef = new AtomicReference<>();
+
+		// Add a master hook which triggers and acks the task checkpoint immediately.
+		// In this case the task checkpoints would complete before the job master checkpoint completes.
+		checkpointCoordinator.addMasterHook(new MasterTriggerRestoreHook<Integer>() {
+			@Override
+			public String getIdentifier() {
+				return "anything";
+			}
+
+			@Override
+			@Nullable
+			public CompletableFuture<Integer> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
+				assertTrue("The coordinator checkpoint should have finished.", coordCheckpointDone.get());
+				// Acknowledge the checkpoint in the master hooks so the task snapshots complete before
+				// the master state snapshot completes.
+				checkpointIdRef.set(checkpointId);
+				AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(
+					jobId, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1);
+				AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(
+					jobId, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
+				checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
+				checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
+				return null;
+			}
+
+			@Override
+			public void restoreCheckpoint(long checkpointId, Integer checkpointData) throws Exception {
+
+			}
+
+			@Override
+			public SimpleVersionedSerializer<Integer> createCheckpointDataSerializer() {
+				return new SimpleVersionedSerializer<Integer>() {
+					@Override
+					public int getVersion() {
+						return 0;
+					}
+
+					@Override
+					public byte[] serialize(Integer obj) throws IOException {
+						return new byte[0];
+					}
+
+					@Override
+					public Integer deserialize(int version, byte[] serialized) throws IOException {
+						return 1;
+					}
+				};
+			}
+		});
+
+		// trigger the first checkpoint. this should succeed
+		final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
+		manuallyTriggeredScheduledExecutor.triggerAll();
+
+		assertTrue(checkpointFuture.isCompletedExceptionally());
+		assertTrue(checkpointCoordinator.getSuccessfulCheckpoints().isEmpty());
+	}
+
 	private CheckpointCoordinator getCheckpointCoordinator(
 		JobID jobId,
 		ExecutionVertex vertex1,