You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 16:27:53 UTC

[flink] 04/13: [FLINK-17670][checkpointing] Savepoint handling of "non-restored" state also takes OperatorCoordinator state into account.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 382cf09bbfb4e383e373af8ebf1a9dee13b8a632
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 13 16:10:49 2020 +0200

    [FLINK-17670][checkpointing] Savepoint handling of "non-restored" state also takes OperatorCoordinator state into account.
    
    The savepoint restore now also fails when there is unmatched state from an OperatorCoordinator only,
    and non-restored state is not allowed.
---
 .../flink/runtime/checkpoint/Checkpoints.java      | 24 ++++++++++++++--------
 .../checkpoint/CheckpointMetadataLoadingTest.java  | 23 +++++++++++++++++++++
 2 files changed, 39 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index 6e23131..23df694 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -171,16 +171,13 @@ public class Checkpoints {
 			} else if (allowNonRestoredState) {
 				LOG.info("Skipping savepoint state for operator {}.", operatorState.getOperatorID());
 			} else {
+				if (operatorState.getCoordinatorState() != null) {
+					throwNonRestoredStateException(checkpointPointer, operatorState.getOperatorID());
+				}
+
 				for (OperatorSubtaskState operatorSubtaskState : operatorState.getStates()) {
 					if (operatorSubtaskState.hasState()) {
-						String msg = String.format("Failed to rollback to checkpoint/savepoint %s. " +
-										"Cannot map checkpoint/savepoint state for operator %s to the new program, " +
-										"because the operator is not available in the new program. If " +
-										"you want to allow to skip this, you can set the --allowNonRestoredState " +
-										"option on the CLI.",
-								checkpointPointer, operatorState.getOperatorID());
-
-						throw new IllegalStateException(msg);
+						throwNonRestoredStateException(checkpointPointer, operatorState.getOperatorID());
 					}
 				}
 
@@ -202,6 +199,17 @@ public class Checkpoints {
 				location);
 	}
 
+	private static void throwNonRestoredStateException(String checkpointPointer, OperatorID operatorId) {
+		String msg = String.format("Failed to rollback to checkpoint/savepoint %s. " +
+				"Cannot map checkpoint/savepoint state for operator %s to the new program, " +
+				"because the operator is not available in the new program. If " +
+				"you want to allow to skip this, you can set the --allowNonRestoredState " +
+				"option on the CLI.",
+			checkpointPointer, operatorId);
+
+		throw new IllegalStateException(msg);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Savepoint Disposal Hooks
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
index 936fe23..987d77d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
@@ -129,6 +129,29 @@ public class CheckpointMetadataLoadingTest {
 		assertTrue(loaded.getOperatorStates().isEmpty());
 	}
 
+	/**
+	 * Tests that savepoint loading fails when there is non-restored coordinator state only,
+	 * and non-restored state is not allowed.
+	 */
+	@Test
+	public void testUnmatchedCoordinatorOnlyStateFails() throws Exception {
+		final OperatorID operatorID = new OperatorID();
+		final int maxParallelism = 1234;
+
+		final OperatorState state = new OperatorState(operatorID, maxParallelism / 2, maxParallelism);
+		state.setCoordinatorState(new ByteStreamStateHandle("coordinatorState", new byte[0]));
+
+		final CompletedCheckpointStorageLocation testSavepoint = createSavepointWithOperatorState(42L, state);
+		final Map<JobVertexID, ExecutionJobVertex> tasks = Collections.emptyMap();
+
+		try {
+			Checkpoints.loadAndValidateCheckpoint(new JobID(), tasks, testSavepoint, cl, false);
+			fail("Did not throw expected Exception");
+		} catch (IllegalStateException expected) {
+			assertTrue(expected.getMessage().contains("allowNonRestoredState"));
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  setup utils
 	// ------------------------------------------------------------------------