You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/11/02 06:34:56 UTC
[2/2] flink git commit: [FLINK-4445] [checkpointing] Add option to
allow non restored checkpoint state
[FLINK-4445] [checkpointing] Add option to allow non restored checkpoint state
Allows to skip checkpoint state that cannot be mapped to a job vertex when
restoring.
This closes #2712.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0e620f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0e620f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0e620f0
Branch: refs/heads/master
Commit: c0e620f0ace0aa3500a5642e7165cf9f05e81f6a
Parents: 74c0770
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Oct 26 18:05:26 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 2 07:34:21 2016 +0100
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 27 ++-
.../runtime/checkpoint/CompletedCheckpoint.java | 10 ++
.../checkpoint/StateAssignmentOperation.java | 32 ++--
.../checkpoint/savepoint/SavepointLoader.java | 18 +-
.../runtime/executiongraph/ExecutionGraph.java | 10 +-
.../jobgraph/SavepointRestoreSettings.java | 34 ++--
.../flink/runtime/jobmanager/JobManager.scala | 15 +-
.../checkpoint/CheckpointCoordinatorTest.java | 8 +-
.../checkpoint/CheckpointStateRestoreTest.java | 177 ++++++++++---------
.../savepoint/SavepointLoaderTest.java | 13 +-
.../runtime/jobmanager/JobManagerTest.java | 158 ++++++++++++++++-
.../test/checkpointing/RescalingITCase.java | 9 +-
.../test/checkpointing/SavepointITCase.java | 7 +-
13 files changed, 361 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 01cd37e..26702c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.state.TaskStateHandles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -760,10 +761,32 @@ public class CheckpointCoordinator {
// Checkpoint State Restoring
// --------------------------------------------------------------------------------------------
+ /**
+ * Restores the latest checkpointed state.
+ *
+ * @param tasks Map of job vertices to restore. State for these vertices is
+ * restored via {@link Execution#setInitialState(TaskStateHandles)}.
+ * @param errorIfNoCheckpoint Fail if no completed checkpoint is available to
+ * restore from.
+ * @param allowNonRestoredState Allow checkpoint state that cannot be mapped
+ * to any job vertex in tasks.
+ * @return <code>true</code> if state was restored, <code>false</code> otherwise.
+ * @throws IllegalStateException If the CheckpointCoordinator is shut down.
+ * @throws IllegalStateException If no completed checkpoint is available and
+ * the <code>failIfNoCheckpoint</code> flag has been set.
+ * @throws IllegalStateException If the checkpoint contains state that cannot be
+ * mapped to any job vertex in <code>tasks</code> and the
+ * <code>allowNonRestoredState</code> flag has not been set.
+ * @throws IllegalStateException If the max parallelism changed for an operator
+ * that restores state from this checkpoint.
+ * @throws IllegalStateException If the parallelism changed for an operator
+ * that restores <i>non-partitioned</i> state from this
+ * checkpoint.
+ */
public boolean restoreLatestCheckpointedState(
Map<JobVertexID, ExecutionJobVertex> tasks,
boolean errorIfNoCheckpoint,
- boolean allOrNothingState) throws Exception {
+ boolean allowNonRestoredState) throws Exception {
synchronized (lock) {
if (shutdown) {
@@ -787,7 +810,7 @@ public class CheckpointCoordinator {
LOG.info("Restoring from latest valid checkpoint: {}.", latest);
StateAssignmentOperation stateAssignmentOperation =
- new StateAssignmentOperation(tasks, latest, allOrNothingState);
+ new StateAssignmentOperation(LOG, tasks, latest, allowNonRestoredState);
stateAssignmentOperation.assignStates();
http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index e135272..3c33ce3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -70,6 +70,16 @@ public class CompletedCheckpoint implements Serializable {
long checkpointID,
long timestamp,
long completionTimestamp,
+ Map<JobVertexID, TaskState> taskStates) {
+
+ this(job, checkpointID, timestamp, completionTimestamp, taskStates, CheckpointProperties.forStandardCheckpoint(), null);
+ }
+
+ public CompletedCheckpoint(
+ JobID job,
+ long checkpointID,
+ long timestamp,
+ long completionTimestamp,
Map<JobVertexID, TaskState> taskStates,
CheckpointProperties props,
String externalPath) {
http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 8e2b0bf..d98c8e8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Arrays;
@@ -42,20 +43,23 @@ import java.util.Map;
*/
public class StateAssignmentOperation {
+ private final Logger logger;
+ private final Map<JobVertexID, ExecutionJobVertex> tasks;
+ private final CompletedCheckpoint latest;
+ private final boolean allowNonRestoredState;
+
public StateAssignmentOperation(
+ Logger logger,
Map<JobVertexID, ExecutionJobVertex> tasks,
CompletedCheckpoint latest,
- boolean allOrNothingState) {
+ boolean allowNonRestoredState) {
+ this.logger = logger;
this.tasks = tasks;
this.latest = latest;
- this.allOrNothingState = allOrNothingState;
+ this.allowNonRestoredState = allowNonRestoredState;
}
- private final Map<JobVertexID, ExecutionJobVertex> tasks;
- private final CompletedCheckpoint latest;
- private final boolean allOrNothingState;
-
public boolean assignStates() throws Exception {
for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : latest.getTaskStates().entrySet()) {
@@ -101,15 +105,10 @@ public class StateAssignmentOperation {
List<KeyGroupsStateHandle> parallelKeyedStatesBackend = new ArrayList<>(oldParallelism);
List<KeyGroupsStateHandle> parallelKeyedStateStream = new ArrayList<>(oldParallelism);
- int counter = 0;
for (int p = 0; p < oldParallelism; ++p) {
-
SubtaskState subtaskState = taskState.getState(p);
if (null != subtaskState) {
-
- ++counter;
-
collectParallelStatesByChainOperator(
parallelOpStatesBackend, subtaskState.getManagedOperatorState());
@@ -128,11 +127,6 @@ public class StateAssignmentOperation {
}
}
- if (allOrNothingState && counter > 0 && counter < oldParallelism) {
- throw new IllegalStateException("The checkpoint contained state only for " +
- "a subset of tasks for vertex " + executionJobVertex);
- }
-
// operator chain index -> lists with collected states (one collection for each parallel subtasks)
@SuppressWarnings("unchecked")
List<Collection<OperatorStateHandle>>[] partitionedParallelStatesBackend = new List[chainLength];
@@ -222,7 +216,8 @@ public class StateAssignmentOperation {
currentExecutionAttempt.setInitialState(taskStateHandles);
}
-
+ } else if (allowNonRestoredState) {
+ logger.info("Skipped checkpoint state for operator {}.", taskState.getJobVertexID());
} else {
throw new IllegalStateException("There is no execution job vertex for the job" +
" vertex ID " + taskGroupStateEntry.getKey());
@@ -230,7 +225,6 @@ public class StateAssignmentOperation {
}
return true;
-
}
/**
@@ -326,4 +320,4 @@ public class StateAssignmentOperation {
return repackStream;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 845008d..1819120 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -24,6 +24,8 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
@@ -34,6 +36,8 @@ import java.util.Map;
*/
public class SavepointLoader {
+ private static final Logger LOG = LoggerFactory.getLogger(SavepointLoader.class);
+
/**
* Loads a savepoint back as a {@link CompletedCheckpoint}.
*
@@ -42,6 +46,8 @@ public class SavepointLoader {
* @param jobId The JobID of the job to load the savepoint for.
* @param tasks Tasks that will possibly be reset
* @param savepointPath The path of the savepoint to rollback to
+ * @param allowNonRestoredState Allow to skip checkpoint state that cannot be mapped
+ * to any job vertex in tasks.
*
* @throws IllegalStateException If mismatch between program and savepoint state
* @throws Exception If savepoint store failure
@@ -49,7 +55,8 @@ public class SavepointLoader {
public static CompletedCheckpoint loadAndValidateSavepoint(
JobID jobId,
Map<JobVertexID, ExecutionJobVertex> tasks,
- String savepointPath) throws IOException {
+ String savepointPath,
+ boolean allowNonRestoredState) throws IOException {
// (1) load the savepoint
Savepoint savepoint = SavepointStore.loadSavepoint(savepointPath);
@@ -76,11 +83,14 @@ public class SavepointLoader {
throw new IllegalStateException(msg);
}
+ } else if (allowNonRestoredState) {
+ LOG.info("Skipping savepoint state for operator {}.", taskState.getJobVertexID());
} else {
String msg = String.format("Failed to rollback to savepoint %s. " +
- "Cannot map old state for task %s to the new program. " +
- "This indicates that the program has been changed in a " +
- "non-compatible way after the savepoint.",
+ "Cannot map savepoint state for operator %s to the new program, " +
+ "because the operator is not available in the new program. If " +
+ "you want to allow to skip this, you can set the --allowNonRestoredState " +
+ "option on the CLI.",
savepointPath, taskState.getJobVertexID());
throw new IllegalStateException(msg);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 244a113..8a4f3ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -911,12 +911,16 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
*
* <p>The recovery of checkpoints might block. Make sure that calls to this method don't
* block the job manager actor and run asynchronously.
- *
+ *
+ * @param errorIfNoCheckpoint Fail if there is no checkpoint available
+ * @param allowNonRestoredState Allow to skip checkpoint state that cannot be mapped
+ * to the the ExecutionGraph vertices (if the checkpoint contains state for a
+ * job vertex that is not part of this ExecutionGraph).
*/
- public void restoreLatestCheckpointedState() throws Exception {
+ public void restoreLatestCheckpointedState(boolean errorIfNoCheckpoint, boolean allowNonRestoredState) throws Exception {
synchronized (progressLock) {
if (checkpointCoordinator != null) {
- checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false);
+ checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), errorIfNoCheckpoint, allowNonRestoredState);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
index 48d3997..10119bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
@@ -33,26 +33,26 @@ public class SavepointRestoreSettings implements Serializable {
private final static SavepointRestoreSettings NONE = new SavepointRestoreSettings(null, false);
/** By default, be strict when restoring from a savepoint. */
- private final static boolean DEFAULT_IGNORE_UNMAPPED_STATE = false;
+ private final static boolean DEFAULT_ALLOW_NON_RESTORED_STATE = false;
/** Savepoint restore path. */
private final String restorePath;
/**
- * Flag indicating whether the restore should ignore if the savepoint contains
- * state for an operator that is not part of the job.
+ * Flag indicating whether non restored state is allowed if the savepoint
+ * contains state for an operator that is not part of the job.
*/
- private final boolean ignoreUnmappedState;
+ private final boolean allowNonRestoredState;
/**
* Creates the restore settings.
*
* @param restorePath Savepoint restore path.
- * @param ignoreUnmappedState Ignore unmapped state.
+ * @param allowNonRestoredState Ignore unmapped state.
*/
- private SavepointRestoreSettings(String restorePath, boolean ignoreUnmappedState) {
+ private SavepointRestoreSettings(String restorePath, boolean allowNonRestoredState) {
this.restorePath = restorePath;
- this.ignoreUnmappedState = ignoreUnmappedState;
+ this.allowNonRestoredState = allowNonRestoredState;
}
/**
@@ -73,14 +73,14 @@ public class SavepointRestoreSettings implements Serializable {
}
/**
- * Returns whether the restore should ignore whether the savepoint contains
- * state that cannot be mapped to the job.
+ * Returns whether non restored state is allowed if the savepoint contains
+ * state that cannot be mapped back to the job.
*
- * @return <code>true</code> if restore should ignore whether the savepoint contains
- * state that cannot be mapped to the job.
+ * @return <code>true</code> if non restored state is allowed if the savepoint
+ * contains state that cannot be mapped back to the job.
*/
- public boolean ignoreUnmappedState() {
- return ignoreUnmappedState;
+ public boolean allowNonRestoredState() {
+ return allowNonRestoredState;
}
@Override
@@ -88,7 +88,7 @@ public class SavepointRestoreSettings implements Serializable {
if (restoreSavepoint()) {
return "SavepointRestoreSettings.forPath(" +
"restorePath='" + restorePath + '\'' +
- ", ignoreUnmappedState=" + ignoreUnmappedState +
+ ", allowNonRestoredState=" + allowNonRestoredState +
')';
} else {
return "SavepointRestoreSettings.none()";
@@ -102,12 +102,12 @@ public class SavepointRestoreSettings implements Serializable {
}
public static SavepointRestoreSettings forPath(String savepointPath) {
- return forPath(savepointPath, DEFAULT_IGNORE_UNMAPPED_STATE);
+ return forPath(savepointPath, DEFAULT_ALLOW_NON_RESTORED_STATE);
}
- public static SavepointRestoreSettings forPath(String savepointPath, boolean ignoreUnmappedState) {
+ public static SavepointRestoreSettings forPath(String savepointPath, boolean allowNonRestoredState) {
checkNotNull(savepointPath, "Savepoint restore path.");
- return new SavepointRestoreSettings(savepointPath, ignoreUnmappedState);
+ return new SavepointRestoreSettings(savepointPath, allowNonRestoredState);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 49f7f3f..9cc8be6 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1317,17 +1317,14 @@ class JobManager(
if (savepointSettings.restoreSavepoint()) {
try {
val savepointPath = savepointSettings.getRestorePath()
- val ignoreUnmapped = savepointSettings.ignoreUnmappedState()
+ val allowNonRestored = savepointSettings.allowNonRestoredState()
- if (ignoreUnmapped) {
- log.info(s"Starting job from savepoint '$savepointPath' (ignore unmapped state).")
- } else {
- log.info(s"Starting job from savepoint '$savepointPath'.")
- }
+ log.info(s"Starting job from savepoint '$savepointPath'" +
+ (if (allowNonRestored) " (allowing non restored state)" else "") + ".")
// load the savepoint as a checkpoint into the system
val savepoint: CompletedCheckpoint = SavepointLoader.loadAndValidateSavepoint(
- jobId, executionGraph.getAllVertices, savepointPath, ignoreUnmapped)
+ jobId, executionGraph.getAllVertices, savepointPath, allowNonRestored)
executionGraph.getCheckpointCoordinator.getCheckpointStore
.addCheckpoint(savepoint)
@@ -1338,9 +1335,11 @@ class JobManager(
executionGraph.getCheckpointCoordinator.getCheckpointIdCounter
.setCount(nextCheckpointId)
- executionGraph.restoreLatestCheckpointedState(true, ignoreUnmapped)
+ executionGraph.restoreLatestCheckpointedState(true, allowNonRestored)
} catch {
case e: Exception =>
+ jobInfo.notifyClients(
+ decorateMessage(JobResultFailure(new SerializedThrowable(e))))
throw new SuppressRestartsException(e)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index ab41113..b874612 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1868,7 +1868,7 @@ public class CheckpointCoordinatorTest {
tasks.put(jobVertexID1, jobVertex1);
tasks.put(jobVertexID2, jobVertex2);
- coord.restoreLatestCheckpointedState(tasks, true, true);
+ coord.restoreLatestCheckpointedState(tasks, true, false);
// verify the restored state
verifiyStateRestore(jobVertexID1, jobVertex1, keyGroupPartitions1);
@@ -1984,7 +1984,7 @@ public class CheckpointCoordinatorTest {
tasks.put(jobVertexID1, newJobVertex1);
tasks.put(jobVertexID2, newJobVertex2);
- coord.restoreLatestCheckpointedState(tasks, true, true);
+ coord.restoreLatestCheckpointedState(tasks, true, false);
fail("The restoration should have failed because the max parallelism changed.");
}
@@ -2106,7 +2106,7 @@ public class CheckpointCoordinatorTest {
tasks.put(jobVertexID1, newJobVertex1);
tasks.put(jobVertexID2, newJobVertex2);
- coord.restoreLatestCheckpointedState(tasks, true, true);
+ coord.restoreLatestCheckpointedState(tasks, true, false);
fail("The restoration should have failed because the parallelism of an vertex with " +
"non-partitioned state changed.");
@@ -2249,7 +2249,7 @@ public class CheckpointCoordinatorTest {
tasks.put(jobVertexID1, newJobVertex1);
tasks.put(jobVertexID2, newJobVertex2);
- coord.restoreLatestCheckpointedState(tasks, true, true);
+ coord.restoreLatestCheckpointedState(tasks, true, false);
// verify the restored state
verifiyStateRestore(jobVertexID1, newJobVertex1, keyGroupPartitions1);
http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 359262f..6e5279b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -59,6 +59,9 @@ import static org.mockito.Mockito.when;
*/
public class CheckpointStateRestoreTest {
+ /**
+ * Tests that on restore the task state is reset for each stateful task.
+ */
@Test
public void testSetState() {
try {
@@ -93,7 +96,6 @@ public class CheckpointStateRestoreTest {
map.put(statefulId, stateful);
map.put(statelessId, stateless);
-
CheckpointCoordinator coord = new CheckpointCoordinator(
jid,
200000L,
@@ -167,92 +169,6 @@ public class CheckpointStateRestoreTest {
}
@Test
- public void testStateOnlyPartiallyAvailable() {
- try {
- final ChainedStateHandle<StreamStateHandle> serializedState = CheckpointCoordinatorTest.generateChainedStateHandle(new SerializableObject());
- KeyGroupRange keyGroupRange = KeyGroupRange.of(0,0);
- List<SerializableObject> testStates = Collections.singletonList(new SerializableObject());
- final KeyGroupsStateHandle serializedKeyGroupStates = CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, testStates);
-
- final JobID jid = new JobID();
- final JobVertexID statefulId = new JobVertexID();
- final JobVertexID statelessId = new JobVertexID();
-
- Execution statefulExec1 = mockExecution();
- Execution statefulExec2 = mockExecution();
- Execution statefulExec3 = mockExecution();
- Execution statelessExec1 = mockExecution();
- Execution statelessExec2 = mockExecution();
-
- ExecutionVertex stateful1 = mockExecutionVertex(statefulExec1, statefulId, 0, 3);
- ExecutionVertex stateful2 = mockExecutionVertex(statefulExec2, statefulId, 1, 3);
- ExecutionVertex stateful3 = mockExecutionVertex(statefulExec3, statefulId, 2, 3);
- ExecutionVertex stateless1 = mockExecutionVertex(statelessExec1, statelessId, 0, 2);
- ExecutionVertex stateless2 = mockExecutionVertex(statelessExec2, statelessId, 1, 2);
-
- ExecutionJobVertex stateful = mockExecutionJobVertex(statefulId,
- new ExecutionVertex[] { stateful1, stateful2, stateful3 });
- ExecutionJobVertex stateless = mockExecutionJobVertex(statelessId,
- new ExecutionVertex[] { stateless1, stateless2 });
-
- Map<JobVertexID, ExecutionJobVertex> map = new HashMap<JobVertexID, ExecutionJobVertex>();
- map.put(statefulId, stateful);
- map.put(statelessId, stateless);
-
-
- CheckpointCoordinator coord = new CheckpointCoordinator(
- jid,
- 200000L,
- 200000L,
- 0,
- Integer.MAX_VALUE,
- ExternalizedCheckpointSettings.none(),
- new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
- new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
- new ExecutionVertex[0],
- new StandaloneCheckpointIDCounter(),
- new StandaloneCompletedCheckpointStore(1),
- null,
- new DisabledCheckpointStatsTracker());
-
- // create ourselves a checkpoint with state
- final long timestamp = 34623786L;
- coord.triggerCheckpoint(timestamp, false);
-
- PendingCheckpoint pending = coord.getPendingCheckpoints().values().iterator().next();
- final long checkpointId = pending.getCheckpointId();
-
- // the difference to the test "testSetState" is that one stateful subtask does not report state
- SubtaskState checkpointStateHandles =
- new SubtaskState(serializedState, null, null, serializedKeyGroupStates, null, 0L);
-
- CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
-
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointMetaData, checkpointStateHandles));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointMetaData));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointMetaData, checkpointStateHandles));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointMetaData));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointMetaData));
-
- assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
-
- // let the coordinator inject the state
- try {
- coord.restoreLatestCheckpointedState(map, true, true);
- fail("this should fail with an exception");
- }
- catch (IllegalStateException e) {
- // swish!
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
public void testNoCheckpointAvailable() {
try {
CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -284,6 +200,93 @@ public class CheckpointStateRestoreTest {
}
}
+ /**
+ * Tests that the allow non restored state flag is correctly handled.
+ *
+ * The flag only applies for state that is part of the checkpoint.
+ */
+ @Test
+ public void testNonRestoredState() throws Exception {
+ // --- (1) Create tasks to restore checkpoint with ---
+ JobVertexID jobVertexId1 = new JobVertexID();
+ JobVertexID jobVertexId2 = new JobVertexID();
+
+ // 1st JobVertex
+ ExecutionVertex vertex11 = mockExecutionVertex(mockExecution(), jobVertexId1, 0, 3);
+ ExecutionVertex vertex12 = mockExecutionVertex(mockExecution(), jobVertexId1, 1, 3);
+ ExecutionVertex vertex13 = mockExecutionVertex(mockExecution(), jobVertexId1, 2, 3);
+ // 2nd JobVertex
+ ExecutionVertex vertex21 = mockExecutionVertex(mockExecution(), jobVertexId2, 0, 2);
+ ExecutionVertex vertex22 = mockExecutionVertex(mockExecution(), jobVertexId2, 1, 2);
+
+ ExecutionJobVertex jobVertex1 = mockExecutionJobVertex(jobVertexId1, new ExecutionVertex[] { vertex11, vertex12, vertex13 });
+ ExecutionJobVertex jobVertex2 = mockExecutionJobVertex(jobVertexId2, new ExecutionVertex[] { vertex21, vertex22 });
+
+ Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
+ tasks.put(jobVertexId1, jobVertex1);
+ tasks.put(jobVertexId2, jobVertex2);
+
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ new JobID(),
+ Integer.MAX_VALUE,
+ Integer.MAX_VALUE,
+ 0,
+ Integer.MAX_VALUE,
+ ExternalizedCheckpointSettings.none(),
+ new ExecutionVertex[] {},
+ new ExecutionVertex[] {},
+ new ExecutionVertex[] {},
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1),
+ null,
+ new DisabledCheckpointStatsTracker());
+
+ ChainedStateHandle<StreamStateHandle> serializedState = CheckpointCoordinatorTest
+ .generateChainedStateHandle(new SerializableObject());
+
+ // --- (2) Checkpoint misses state for a jobVertex (should work) ---
+ Map<JobVertexID, TaskState> checkpointTaskStates = new HashMap<>();
+ {
+ TaskState taskState = new TaskState(jobVertexId1, 3, 3, 1);
+ taskState.putState(0, new SubtaskState(serializedState, null, null, null, null));
+ taskState.putState(1, new SubtaskState(serializedState, null, null, null, null));
+ taskState.putState(2, new SubtaskState(serializedState, null, null, null, null));
+
+ checkpointTaskStates.put(jobVertexId1, taskState);
+ }
+ CompletedCheckpoint checkpoint = new CompletedCheckpoint(new JobID(), 0, 1, 2, new HashMap<>(checkpointTaskStates));
+
+ coord.getCheckpointStore().addCheckpoint(checkpoint);
+
+ coord.restoreLatestCheckpointedState(tasks, true, false);
+ coord.restoreLatestCheckpointedState(tasks, true, true);
+
+ // --- (3) JobVertex missing for task state that is part of the checkpoint ---
+ JobVertexID newJobVertexID = new JobVertexID();
+
+ // There is no task for this
+ {
+ TaskState taskState = new TaskState(jobVertexId1, 1, 1, 1);
+ taskState.putState(0, new SubtaskState(serializedState, null, null, null, null));
+
+ checkpointTaskStates.put(newJobVertexID, taskState);
+ }
+
+ checkpoint = new CompletedCheckpoint(new JobID(), 1, 2, 3, new HashMap<>(checkpointTaskStates));
+
+ coord.getCheckpointStore().addCheckpoint(checkpoint);
+
+ // (i) Allow non restored state (should succeed)
+ coord.restoreLatestCheckpointedState(tasks, true, true);
+
+ // (ii) Don't allow non restored state (should fail)
+ try {
+ coord.restoreLatestCheckpointedState(tasks, true, false);
+ fail("Did not throw the expected Exception.");
+ } catch (IllegalStateException ignored) {
+ }
+ }
+
// ------------------------------------------------------------------------
private Execution mockExecution() {
http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index b594f4e..e1b83f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -78,7 +78,7 @@ public class SavepointLoaderTest {
tasks.put(vertexId, vertex);
// 1) Load and validate: everything correct
- CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path);
+ CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, false);
assertEquals(jobId, loaded.getJobId());
assertEquals(checkpointId, loaded.getCheckpointID());
@@ -87,20 +87,23 @@ public class SavepointLoaderTest {
when(vertex.getMaxParallelism()).thenReturn(222);
try {
- SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path);
+ SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, false);
fail("Did not throw expected Exception");
} catch (IllegalStateException expected) {
assertTrue(expected.getMessage().contains("Max parallelism mismatch"));
}
- // 3) Load and validate: missing vertex (this should be relaxed)
+ // 3) Load and validate: missing vertex
assertNotNull(tasks.remove(vertexId));
try {
- SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path);
+ SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, false);
fail("Did not throw expected Exception");
} catch (IllegalStateException expected) {
- assertTrue(expected.getMessage().contains("Cannot map old state"));
+ assertTrue(expected.getMessage().contains("allowNonRestoredState"));
}
+
+ // 4) Load and validate: ignore missing vertex
+ SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, true);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index b5e5d45..ff604f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -45,12 +45,14 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
@@ -60,6 +62,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess;
import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
+import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
@@ -786,7 +789,7 @@ public class JobManagerTest {
Future<Object> future = jobManager.ask(msg, timeout);
Object result = Await.result(future, timeout);
- assertTrue("Did not trigger savepoint", result instanceof JobManagerMessages.TriggerSavepointSuccess);
+ assertTrue("Did not trigger savepoint", result instanceof TriggerSavepointSuccess);
assertEquals(1, targetDirectory.listFiles().length);
} finally {
if (actorSystem != null) {
@@ -806,4 +809,157 @@ public class JobManagerTest {
}
}
}
+
+ /**
+ * Tests that configured {@link SavepointRestoreSettings} are respected.
+ */
+ @Test
+ public void testSavepointRestoreSettings() throws Exception {
+ FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
+
+ ActorSystem actorSystem = null;
+ ActorGateway jobManager = null;
+ ActorGateway archiver = null;
+ ActorGateway taskManager = null;
+ try {
+ actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+
+ Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
+ new Configuration(),
+ actorSystem,
+ Option.apply("jm"),
+ Option.apply("arch"),
+ TestingJobManager.class,
+ TestingMemoryArchivist.class);
+
+ jobManager = new AkkaActorGateway(master._1(), null);
+ archiver = new AkkaActorGateway(master._2(), null);
+
+ Configuration tmConfig = new Configuration();
+ tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
+
+ ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
+ tmConfig,
+ ResourceID.generate(),
+ actorSystem,
+ "localhost",
+ Option.apply("tm"),
+ Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path())),
+ true,
+ TestingTaskManager.class);
+
+ taskManager = new AkkaActorGateway(taskManagerRef, null);
+
+ // Wait until connected
+ Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
+ Await.ready(taskManager.ask(msg, timeout), timeout);
+
+ // Create job graph
+ JobVertex sourceVertex = new JobVertex("Source");
+ sourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
+ sourceVertex.setParallelism(1);
+
+ JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
+
+ JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings(
+ Collections.singletonList(sourceVertex.getID()),
+ Collections.singletonList(sourceVertex.getID()),
+ Collections.singletonList(sourceVertex.getID()),
+ Long.MAX_VALUE, // deactivated checkpointing
+ 360000,
+ 0,
+ Integer.MAX_VALUE,
+ ExternalizedCheckpointSettings.none());
+
+ jobGraph.setSnapshotSettings(snapshottingSettings);
+
+ // Submit job graph
+ msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
+ Await.result(jobManager.ask(msg, timeout), timeout);
+
+ // Wait for all tasks to be running
+ msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
+ Await.result(jobManager.ask(msg, timeout), timeout);
+
+ // Trigger savepoint
+ File targetDirectory = tmpFolder.newFolder();
+ msg = new TriggerSavepoint(jobGraph.getJobID(), Option.apply(targetDirectory.getAbsolutePath()));
+ Future<Object> future = jobManager.ask(msg, timeout);
+ Object result = Await.result(future, timeout);
+
+ String savepointPath = ((TriggerSavepointSuccess) result).savepointPath();
+
+ // Cancel because of restarts
+ msg = new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID());
+ Future<?> removedFuture = jobManager.ask(msg, timeout);
+
+ Future<?> cancelFuture = jobManager.ask(new CancelJob(jobGraph.getJobID()), timeout);
+ Object response = Await.result(cancelFuture, timeout);
+ assertTrue("Unexpected response: " + response, response instanceof CancellationSuccess);
+
+ Await.ready(removedFuture, timeout);
+
+ // Adjust the job (we need a new operator ID)
+ JobVertex newSourceVertex = new JobVertex("NewSource");
+ newSourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
+ newSourceVertex.setParallelism(1);
+
+ JobGraph newJobGraph = new JobGraph("NewTestingJob", newSourceVertex);
+
+ JobSnapshottingSettings newSnapshottingSettings = new JobSnapshottingSettings(
+ Collections.singletonList(newSourceVertex.getID()),
+ Collections.singletonList(newSourceVertex.getID()),
+ Collections.singletonList(newSourceVertex.getID()),
+ Long.MAX_VALUE, // deactivated checkpointing
+ 360000,
+ 0,
+ Integer.MAX_VALUE,
+ ExternalizedCheckpointSettings.none());
+
+ newJobGraph.setSnapshotSettings(newSnapshottingSettings);
+
+ SavepointRestoreSettings restoreSettings = SavepointRestoreSettings.forPath(savepointPath, false);
+ newJobGraph.setSavepointRestoreSettings(restoreSettings);
+
+ msg = new JobManagerMessages.SubmitJob(newJobGraph, ListeningBehaviour.DETACHED);
+ response = Await.result(jobManager.ask(msg, timeout), timeout);
+
+ assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.JobResultFailure);
+
+ JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) response;
+ Throwable cause = failure.cause().deserializeError(ClassLoader.getSystemClassLoader());
+
+ assertTrue(cause instanceof IllegalStateException);
+ assertTrue(cause.getMessage().contains("allowNonRestoredState"));
+
+ // Wait until removed
+ msg = new TestingJobManagerMessages.NotifyWhenJobRemoved(newJobGraph.getJobID());
+ Await.ready(jobManager.ask(msg, timeout), timeout);
+
+ // Resubmit, but allow non restored state now
+ restoreSettings = SavepointRestoreSettings.forPath(savepointPath, true);
+ newJobGraph.setSavepointRestoreSettings(restoreSettings);
+
+ msg = new JobManagerMessages.SubmitJob(newJobGraph, ListeningBehaviour.DETACHED);
+ response = Await.result(jobManager.ask(msg, timeout), timeout);
+
+ assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.JobSubmitSuccess);
+ } finally {
+ if (actorSystem != null) {
+ actorSystem.shutdown();
+ }
+
+ if (archiver != null) {
+ archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+
+ if (jobManager != null) {
+ jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+
+ if (taskManager != null) {
+ taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index da25ae6..5a64173 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -32,6 +32,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -192,7 +193,7 @@ public class RescalingITCase extends TestLogger {
JobGraph scaledJobGraph = createJobGraphWithKeyedState(parallelism2, maxParallelism, numberKeys, numberElements2, true, 100);
- scaledJobGraph.setSavepointPath(savepointPath);
+ scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
jobID = scaledJobGraph.getJobID();
@@ -284,7 +285,7 @@ public class RescalingITCase extends TestLogger {
JobGraph scaledJobGraph = createJobGraphWithOperatorState(parallelism2, maxParallelism, OperatorCheckpointMethod.NON_PARTITIONED);
- scaledJobGraph.setSavepointPath(savepointPath);
+ scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
jobID = scaledJobGraph.getJobID();
@@ -397,7 +398,7 @@ public class RescalingITCase extends TestLogger {
true,
100);
- scaledJobGraph.setSavepointPath(savepointPath);
+ scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
jobID = scaledJobGraph.getJobID();
@@ -523,7 +524,7 @@ public class RescalingITCase extends TestLogger {
JobGraph scaledJobGraph = createJobGraphWithOperatorState(parallelism2, maxParallelism, checkpointMethod);
- scaledJobGraph.setSavepointPath(savepointPath);
+ scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
jobID = scaledJobGraph.getJobID();
http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index d794953..87cf80f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
@@ -283,7 +284,7 @@ public class SavepointITCase extends TestLogger {
}
// Set the savepoint path
- jobGraph.setSavepointPath(savepointPath);
+ jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
LOG.info("Resubmitting job " + jobGraph.getJobID() + " with " +
"savepoint path " + savepointPath + " in detached mode.");
@@ -452,8 +453,8 @@ public class SavepointITCase extends TestLogger {
final JobGraph jobGraph = createJobGraph(parallelism, numberOfRetries, 3600000, 1000);
// Set non-existing savepoint path
- jobGraph.setSavepointPath("unknown path");
- assertEquals("unknown path", jobGraph.getSnapshotSettings().getSavepointPath());
+ jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("unknown path"));
+ assertEquals("unknown path", jobGraph.getSavepointRestoreSettings().getRestorePath());
LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");