You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/09/09 01:17:14 UTC
[flink] 06/06: [hotfix] Add unit test for checkpoint failure.
This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7286c662af63626914e0e532203288db615f67c3
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Mon Sep 7 22:47:55 2020 +0800
[hotfix] Add unit test for checkpoint failure.
---
.../checkpoint/CheckpointCoordinatorTest.java | 126 +++++++++++++++++++++
1 file changed, 126 insertions(+)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 16d7136..78e5336 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -38,6 +38,9 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -50,6 +53,9 @@ import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
import org.apache.flink.util.ExceptionUtils;
@@ -2436,6 +2442,126 @@ public class CheckpointCoordinatorTest extends TestLogger {
checkpointCoordinator.shutdown(JobStatus.FINISHED);
}
+ @Test
+ public void testCompleteCheckpointFailureWithExternallyInducedSource() throws Exception {
+ final JobID jobId = new JobID();
+
+ // create some mock Execution vertices that receive the checkpoint trigger messages
+ final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+ final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+ ExecutionVertex vertex1 = mockExecutionVertex(attemptID1,
+ (executionAttemptID, jid, checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+ ExecutionVertex vertex2 = mockExecutionVertex(attemptID2,
+ (executionAttemptID, jid, checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+
+ OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
+ OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
+ TaskStateSnapshot taskOperatorSubtaskStates1 = new TaskStateSnapshot();
+ TaskStateSnapshot taskOperatorSubtaskStates2 = new TaskStateSnapshot();
+ OperatorSubtaskState subtaskState1 = new OperatorSubtaskState();
+ OperatorSubtaskState subtaskState2 = new OperatorSubtaskState();
+ taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
+ taskOperatorSubtaskStates2.putSubtaskStateByOperatorID(opID2, subtaskState2);
+
+ // Create a mock OperatorCoordinatorCheckpointContext which completes the checkpoint immediately.
+ AtomicBoolean coordCheckpointDone = new AtomicBoolean(false);
+ OperatorCoordinatorCheckpointContext coordinatorCheckpointContext =
+ new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder()
+ .setOnCallingCheckpointCoordinator((checkpointId, result) -> {
+ coordCheckpointDone.set(true);
+ result.complete(new byte[0]);
+ })
+ .setOperatorID(opID1)
+ .build();
+
+ // set up the coordinator and validate the initial state
+ CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorBuilder()
+ .setJobId(jobId)
+ .setTasks(new ExecutionVertex[]{ vertex1, vertex2 })
+ .setCheckpointCoordinatorConfiguration(
+ CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build())
+ .setTimer(manuallyTriggeredScheduledExecutor)
+ .setCoordinatorsToCheckpoint(Collections.singleton(coordinatorCheckpointContext))
+ .setStateBackEnd(new MemoryStateBackend() {
+ private static final long serialVersionUID = 8134582566514272546L;
+
+ // Throw exception when finalizing the checkpoint.
+ @Override
+ public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
+ return new MemoryBackendCheckpointStorage(jobId, null, null, 100) {
+ @Override
+ public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
+ return new NonPersistentMetadataCheckpointStorageLocation(1000) {
+ @Override
+ public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException {
+ throw new IOException("Artificial Exception");
+ }
+ };
+ }
+ };
+ }
+ })
+ .build();
+ AtomicReference<Long> checkpointIdRef = new AtomicReference<>();
+
+ // Add a master hook which triggers and acks the task checkpoint immediately.
+ // In this case the task checkpoints would complete before the job master checkpoint completes.
+ checkpointCoordinator.addMasterHook(new MasterTriggerRestoreHook<Integer>() {
+ @Override
+ public String getIdentifier() {
+ return "anything";
+ }
+
+ @Override
+ @Nullable
+ public CompletableFuture<Integer> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
+ assertTrue("The coordinator checkpoint should have finished.", coordCheckpointDone.get());
+ // Acknowledge the checkpoint in the master hooks so the task snapshots complete before
+ // the master state snapshot completes.
+ checkpointIdRef.set(checkpointId);
+ AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(
+ jobId, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1);
+ AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(
+ jobId, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
+ return null;
+ }
+
+ @Override
+ public void restoreCheckpoint(long checkpointId, Integer checkpointData) throws Exception {
+
+ }
+
+ @Override
+ public SimpleVersionedSerializer<Integer> createCheckpointDataSerializer() {
+ return new SimpleVersionedSerializer<Integer>() {
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(Integer obj) throws IOException {
+ return new byte[0];
+ }
+
+ @Override
+ public Integer deserialize(int version, byte[] serialized) throws IOException {
+ return 1;
+ }
+ };
+ }
+ });
+
+ // trigger the first checkpoint. this should succeed
+ final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ assertTrue(checkpointFuture.isCompletedExceptionally());
+ assertTrue(checkpointCoordinator.getSuccessfulCheckpoints().isEmpty());
+ }
+
private CheckpointCoordinator getCheckpointCoordinator(
JobID jobId,
ExecutionVertex vertex1,