You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 16:27:53 UTC
[flink] 04/13: [FLINK-17670][checkpointing] Savepoint handling of
"non-restored" state also takes OperatorCoordinator state into account.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 382cf09bbfb4e383e373af8ebf1a9dee13b8a632
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 13 16:10:49 2020 +0200
[FLINK-17670][checkpointing] Savepoint handling of "non-restored" state also takes OperatorCoordinator state into account.
The savepoint restore now also fails when there is unmatched state from an OperatorCoordinator only,
and non-restored state is not allowed.
---
.../flink/runtime/checkpoint/Checkpoints.java | 24 ++++++++++++++--------
.../checkpoint/CheckpointMetadataLoadingTest.java | 23 +++++++++++++++++++++
2 files changed, 39 insertions(+), 8 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index 6e23131..23df694 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -171,16 +171,13 @@ public class Checkpoints {
} else if (allowNonRestoredState) {
LOG.info("Skipping savepoint state for operator {}.", operatorState.getOperatorID());
} else {
+ if (operatorState.getCoordinatorState() != null) {
+ throwNonRestoredStateException(checkpointPointer, operatorState.getOperatorID());
+ }
+
for (OperatorSubtaskState operatorSubtaskState : operatorState.getStates()) {
if (operatorSubtaskState.hasState()) {
- String msg = String.format("Failed to rollback to checkpoint/savepoint %s. " +
- "Cannot map checkpoint/savepoint state for operator %s to the new program, " +
- "because the operator is not available in the new program. If " +
- "you want to allow to skip this, you can set the --allowNonRestoredState " +
- "option on the CLI.",
- checkpointPointer, operatorState.getOperatorID());
-
- throw new IllegalStateException(msg);
+ throwNonRestoredStateException(checkpointPointer, operatorState.getOperatorID());
}
}
@@ -202,6 +199,17 @@ public class Checkpoints {
location);
}
+ private static void throwNonRestoredStateException(String checkpointPointer, OperatorID operatorId) {
+ String msg = String.format("Failed to rollback to checkpoint/savepoint %s. " +
+ "Cannot map checkpoint/savepoint state for operator %s to the new program, " +
+ "because the operator is not available in the new program. If " +
+ "you want to allow to skip this, you can set the --allowNonRestoredState " +
+ "option on the CLI.",
+ checkpointPointer, operatorId);
+
+ throw new IllegalStateException(msg);
+ }
+
// ------------------------------------------------------------------------
// Savepoint Disposal Hooks
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
index 936fe23..987d77d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
@@ -129,6 +129,29 @@ public class CheckpointMetadataLoadingTest {
assertTrue(loaded.getOperatorStates().isEmpty());
}
+ /**
+ * Tests that savepoint loading fails when there is non-restored coordinator state only,
+ * and non-restored state is not allowed.
+ */
+ @Test
+ public void testUnmatchedCoordinatorOnlyStateFails() throws Exception {
+ final OperatorID operatorID = new OperatorID();
+ final int maxParallelism = 1234;
+
+ final OperatorState state = new OperatorState(operatorID, maxParallelism / 2, maxParallelism);
+ state.setCoordinatorState(new ByteStreamStateHandle("coordinatorState", new byte[0]));
+
+ final CompletedCheckpointStorageLocation testSavepoint = createSavepointWithOperatorState(42L, state);
+ final Map<JobVertexID, ExecutionJobVertex> tasks = Collections.emptyMap();
+
+ try {
+ Checkpoints.loadAndValidateCheckpoint(new JobID(), tasks, testSavepoint, cl, false);
+ fail("Did not throw expected Exception");
+ } catch (IllegalStateException expected) {
+ assertTrue(expected.getMessage().contains("allowNonRestoredState"));
+ }
+ }
+
// ------------------------------------------------------------------------
// setup utils
// ------------------------------------------------------------------------