You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/11/02 06:34:56 UTC

[2/2] flink git commit: [FLINK-4445] [checkpointing] Add option to allow non restored checkpoint state

[FLINK-4445] [checkpointing] Add option to allow non restored checkpoint state

Allows to skip checkpoint state that cannot be mapped to a job vertex when
restoring.

This closes #2712.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0e620f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0e620f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0e620f0

Branch: refs/heads/master
Commit: c0e620f0ace0aa3500a5642e7165cf9f05e81f6a
Parents: 74c0770
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Oct 26 18:05:26 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 2 07:34:21 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  27 ++-
 .../runtime/checkpoint/CompletedCheckpoint.java |  10 ++
 .../checkpoint/StateAssignmentOperation.java    |  32 ++--
 .../checkpoint/savepoint/SavepointLoader.java   |  18 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  10 +-
 .../jobgraph/SavepointRestoreSettings.java      |  34 ++--
 .../flink/runtime/jobmanager/JobManager.scala   |  15 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |   8 +-
 .../checkpoint/CheckpointStateRestoreTest.java  | 177 ++++++++++---------
 .../savepoint/SavepointLoaderTest.java          |  13 +-
 .../runtime/jobmanager/JobManagerTest.java      | 158 ++++++++++++++++-
 .../test/checkpointing/RescalingITCase.java     |   9 +-
 .../test/checkpointing/SavepointITCase.java     |   7 +-
 13 files changed, 361 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 01cd37e..26702c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.state.TaskStateHandles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -760,10 +761,32 @@ public class CheckpointCoordinator {
 	//  Checkpoint State Restoring
 	// --------------------------------------------------------------------------------------------
 
+	/**
+	 * Restores the latest checkpointed state.
+	 *
+	 * @param tasks Map of job vertices to restore. State for these vertices is
+	 * restored via {@link Execution#setInitialState(TaskStateHandles)}.
+	 * @param errorIfNoCheckpoint Fail if no completed checkpoint is available to
+	 * restore from.
+	 * @param allowNonRestoredState Allow checkpoint state that cannot be mapped
+	 * to any job vertex in tasks.
+	 * @return <code>true</code> if state was restored, <code>false</code> otherwise.
+	 * @throws IllegalStateException If the CheckpointCoordinator is shut down.
+	 * @throws IllegalStateException If no completed checkpoint is available and
+	 *                               the <code>failIfNoCheckpoint</code> flag has been set.
+	 * @throws IllegalStateException If the checkpoint contains state that cannot be
+	 *                               mapped to any job vertex in <code>tasks</code> and the
+	 *                               <code>allowNonRestoredState</code> flag has not been set.
+	 * @throws IllegalStateException If the max parallelism changed for an operator
+	 *                               that restores state from this checkpoint.
+	 * @throws IllegalStateException If the parallelism changed for an operator
+	 *                               that restores <i>non-partitioned</i> state from this
+	 *                               checkpoint.
+	 */
 	public boolean restoreLatestCheckpointedState(
 			Map<JobVertexID, ExecutionJobVertex> tasks,
 			boolean errorIfNoCheckpoint,
-			boolean allOrNothingState) throws Exception {
+			boolean allowNonRestoredState) throws Exception {
 
 		synchronized (lock) {
 			if (shutdown) {
@@ -787,7 +810,7 @@ public class CheckpointCoordinator {
 			LOG.info("Restoring from latest valid checkpoint: {}.", latest);
 
 			StateAssignmentOperation stateAssignmentOperation =
-					new StateAssignmentOperation(tasks, latest, allOrNothingState);
+					new StateAssignmentOperation(LOG, tasks, latest, allowNonRestoredState);
 
 			stateAssignmentOperation.assignStates();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index e135272..3c33ce3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -70,6 +70,16 @@ public class CompletedCheckpoint implements Serializable {
 			long checkpointID,
 			long timestamp,
 			long completionTimestamp,
+			Map<JobVertexID, TaskState> taskStates) {
+
+		this(job, checkpointID, timestamp, completionTimestamp, taskStates, CheckpointProperties.forStandardCheckpoint(), null);
+	}
+
+	public CompletedCheckpoint(
+			JobID job,
+			long checkpointID,
+			long timestamp,
+			long completionTimestamp,
 			Map<JobVertexID, TaskState> taskStates,
 			CheckpointProperties props,
 			String externalPath) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 8e2b0bf..d98c8e8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -42,20 +43,23 @@ import java.util.Map;
  */
 public class StateAssignmentOperation {
 
+	private final Logger logger;
+	private final Map<JobVertexID, ExecutionJobVertex> tasks;
+	private final CompletedCheckpoint latest;
+	private final boolean allowNonRestoredState;
+
 	public StateAssignmentOperation(
+			Logger logger,
 			Map<JobVertexID, ExecutionJobVertex> tasks,
 			CompletedCheckpoint latest,
-			boolean allOrNothingState) {
+			boolean allowNonRestoredState) {
 
+		this.logger = logger;
 		this.tasks = tasks;
 		this.latest = latest;
-		this.allOrNothingState = allOrNothingState;
+		this.allowNonRestoredState = allowNonRestoredState;
 	}
 
-	private final Map<JobVertexID, ExecutionJobVertex> tasks;
-	private final CompletedCheckpoint latest;
-	private final boolean allOrNothingState;
-
 	public boolean assignStates() throws Exception {
 
 		for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : latest.getTaskStates().entrySet()) {
@@ -101,15 +105,10 @@ public class StateAssignmentOperation {
 				List<KeyGroupsStateHandle> parallelKeyedStatesBackend = new ArrayList<>(oldParallelism);
 				List<KeyGroupsStateHandle> parallelKeyedStateStream = new ArrayList<>(oldParallelism);
 
-				int counter = 0;
 				for (int p = 0; p < oldParallelism; ++p) {
-
 					SubtaskState subtaskState = taskState.getState(p);
 
 					if (null != subtaskState) {
-
-						++counter;
-
 						collectParallelStatesByChainOperator(
 								parallelOpStatesBackend, subtaskState.getManagedOperatorState());
 
@@ -128,11 +127,6 @@ public class StateAssignmentOperation {
 					}
 				}
 
-				if (allOrNothingState && counter > 0 && counter < oldParallelism) {
-					throw new IllegalStateException("The checkpoint contained state only for " +
-							"a subset of tasks for vertex " + executionJobVertex);
-				}
-
 				// operator chain index -> lists with collected states (one collection for each parallel subtasks)
 				@SuppressWarnings("unchecked")
 				List<Collection<OperatorStateHandle>>[] partitionedParallelStatesBackend = new List[chainLength];
@@ -222,7 +216,8 @@ public class StateAssignmentOperation {
 
 					currentExecutionAttempt.setInitialState(taskStateHandles);
 				}
-
+			} else if (allowNonRestoredState) {
+				logger.info("Skipped checkpoint state for operator {}.", taskState.getJobVertexID());
 			} else {
 				throw new IllegalStateException("There is no execution job vertex for the job" +
 						" vertex ID " + taskGroupStateEntry.getKey());
@@ -230,7 +225,6 @@ public class StateAssignmentOperation {
 		}
 
 		return true;
-
 	}
 
 	/**
@@ -326,4 +320,4 @@ public class StateAssignmentOperation {
 			return repackStream;
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 845008d..1819120 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -24,6 +24,8 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -34,6 +36,8 @@ import java.util.Map;
  */
 public class SavepointLoader {
 
+	private static final Logger LOG = LoggerFactory.getLogger(SavepointLoader.class);
+
 	/**
 	 * Loads a savepoint back as a {@link CompletedCheckpoint}.
 	 *
@@ -42,6 +46,8 @@ public class SavepointLoader {
 	 * @param jobId          The JobID of the job to load the savepoint for.
 	 * @param tasks          Tasks that will possibly be reset
 	 * @param savepointPath  The path of the savepoint to rollback to
+	 * @param allowNonRestoredState Allow to skip checkpoint state that cannot be mapped
+	 * to any job vertex in tasks.
 	 *
 	 * @throws IllegalStateException If mismatch between program and savepoint state
 	 * @throws Exception             If savepoint store failure
@@ -49,7 +55,8 @@ public class SavepointLoader {
 	public static CompletedCheckpoint loadAndValidateSavepoint(
 			JobID jobId,
 			Map<JobVertexID, ExecutionJobVertex> tasks,
-			String savepointPath) throws IOException {
+			String savepointPath,
+			boolean allowNonRestoredState) throws IOException {
 
 		// (1) load the savepoint
 		Savepoint savepoint = SavepointStore.loadSavepoint(savepointPath);
@@ -76,11 +83,14 @@ public class SavepointLoader {
 
 					throw new IllegalStateException(msg);
 				}
+			} else if (allowNonRestoredState) {
+				LOG.info("Skipping savepoint state for operator {}.", taskState.getJobVertexID());
 			} else {
 				String msg = String.format("Failed to rollback to savepoint %s. " +
-								"Cannot map old state for task %s to the new program. " +
-								"This indicates that the program has been changed in a " +
-								"non-compatible way  after the savepoint.",
+								"Cannot map savepoint state for operator %s to the new program, " +
+								"because the operator is not available in the new program. If " +
+								"you want to allow to skip this, you can set the --allowNonRestoredState " +
+								"option on the CLI.",
 						savepointPath, taskState.getJobVertexID());
 				throw new IllegalStateException(msg);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 244a113..8a4f3ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -911,12 +911,16 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 *
 	 * <p>The recovery of checkpoints might block. Make sure that calls to this method don't
 	 * block the job manager actor and run asynchronously.
-	 * 
+	 *
+	 * @param errorIfNoCheckpoint Fail if there is no checkpoint available
+	 * @param allowNonRestoredState Allow to skip checkpoint state that cannot be mapped
+	 * to the the ExecutionGraph vertices (if the checkpoint contains state for a
+	 * job vertex that is not part of this ExecutionGraph).
 	 */
-	public void restoreLatestCheckpointedState() throws Exception {
+	public void restoreLatestCheckpointedState(boolean errorIfNoCheckpoint, boolean allowNonRestoredState) throws Exception {
 		synchronized (progressLock) {
 			if (checkpointCoordinator != null) {
-				checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false);
+				checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), errorIfNoCheckpoint, allowNonRestoredState);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
index 48d3997..10119bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
@@ -33,26 +33,26 @@ public class SavepointRestoreSettings implements Serializable {
 	private final static SavepointRestoreSettings NONE = new SavepointRestoreSettings(null, false);
 
 	/** By default, be strict when restoring from a savepoint.  */
-	private final static boolean DEFAULT_IGNORE_UNMAPPED_STATE = false;
+	private final static boolean DEFAULT_ALLOW_NON_RESTORED_STATE = false;
 
 	/** Savepoint restore path. */
 	private final String restorePath;
 
 	/**
-	 * Flag indicating whether the restore should ignore if the savepoint contains
-	 * state for an operator that is not part of the job.
+	 * Flag indicating whether non restored state is allowed if the savepoint
+	 * contains state for an operator that is not part of the job.
 	 */
-	private final boolean ignoreUnmappedState;
+	private final boolean allowNonRestoredState;
 
 	/**
 	 * Creates the restore settings.
 	 *
 	 * @param restorePath Savepoint restore path.
-	 * @param ignoreUnmappedState Ignore unmapped state.
+	 * @param allowNonRestoredState Ignore unmapped state.
 	 */
-	private SavepointRestoreSettings(String restorePath, boolean ignoreUnmappedState) {
+	private SavepointRestoreSettings(String restorePath, boolean allowNonRestoredState) {
 		this.restorePath = restorePath;
-		this.ignoreUnmappedState = ignoreUnmappedState;
+		this.allowNonRestoredState = allowNonRestoredState;
 	}
 
 	/**
@@ -73,14 +73,14 @@ public class SavepointRestoreSettings implements Serializable {
 	}
 
 	/**
-	 * Returns whether the restore should ignore whether the savepoint contains
-	 * state that cannot be mapped to the job.
+	 * Returns whether non restored state is allowed if the savepoint contains
+	 * state that cannot be mapped back to the job.
 	 *
-	 * @return <code>true</code> if restore should ignore whether the savepoint contains
-	 * state that cannot be mapped to the job.
+	 * @return <code>true</code> if non restored state is allowed if the savepoint
+	 * contains state that cannot be mapped  back to the job.
 	 */
-	public boolean ignoreUnmappedState() {
-		return ignoreUnmappedState;
+	public boolean allowNonRestoredState() {
+		return allowNonRestoredState;
 	}
 
 	@Override
@@ -88,7 +88,7 @@ public class SavepointRestoreSettings implements Serializable {
 		if (restoreSavepoint()) {
 			return "SavepointRestoreSettings.forPath(" +
 					"restorePath='" + restorePath + '\'' +
-					", ignoreUnmappedState=" + ignoreUnmappedState +
+					", allowNonRestoredState=" + allowNonRestoredState +
 					')';
 		} else {
 			return "SavepointRestoreSettings.none()";
@@ -102,12 +102,12 @@ public class SavepointRestoreSettings implements Serializable {
 	}
 
 	public static SavepointRestoreSettings forPath(String savepointPath) {
-		return forPath(savepointPath, DEFAULT_IGNORE_UNMAPPED_STATE);
+		return forPath(savepointPath, DEFAULT_ALLOW_NON_RESTORED_STATE);
 	}
 
-	public static SavepointRestoreSettings forPath(String savepointPath, boolean ignoreUnmappedState) {
+	public static SavepointRestoreSettings forPath(String savepointPath, boolean allowNonRestoredState) {
 		checkNotNull(savepointPath, "Savepoint restore path.");
-		return new SavepointRestoreSettings(savepointPath, ignoreUnmappedState);
+		return new SavepointRestoreSettings(savepointPath, allowNonRestoredState);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 49f7f3f..9cc8be6 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1317,17 +1317,14 @@ class JobManager(
             if (savepointSettings.restoreSavepoint()) {
               try {
                 val savepointPath = savepointSettings.getRestorePath()
-                val ignoreUnmapped = savepointSettings.ignoreUnmappedState()
+                val allowNonRestored = savepointSettings.allowNonRestoredState()
 
-                if (ignoreUnmapped) {
-                  log.info(s"Starting job from savepoint '$savepointPath' (ignore unmapped state).")
-                } else {
-                  log.info(s"Starting job from savepoint '$savepointPath'.")
-                }
+                log.info(s"Starting job from savepoint '$savepointPath'" +
+                  (if (allowNonRestored) " (allowing non restored state)" else "") + ".")
 
                 // load the savepoint as a checkpoint into the system
                 val savepoint: CompletedCheckpoint = SavepointLoader.loadAndValidateSavepoint(
-                  jobId, executionGraph.getAllVertices, savepointPath, ignoreUnmapped)
+                  jobId, executionGraph.getAllVertices, savepointPath, allowNonRestored)
 
                 executionGraph.getCheckpointCoordinator.getCheckpointStore
                   .addCheckpoint(savepoint)
@@ -1338,9 +1335,11 @@ class JobManager(
                 executionGraph.getCheckpointCoordinator.getCheckpointIdCounter
                   .setCount(nextCheckpointId)
 
-                executionGraph.restoreLatestCheckpointedState(true, ignoreUnmapped)
+                executionGraph.restoreLatestCheckpointedState(true, allowNonRestored)
               } catch {
                 case e: Exception =>
+                  jobInfo.notifyClients(
+                    decorateMessage(JobResultFailure(new SerializedThrowable(e))))
                   throw new SuppressRestartsException(e)
               }
             }

http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index ab41113..b874612 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1868,7 +1868,7 @@ public class CheckpointCoordinatorTest {
 		tasks.put(jobVertexID1, jobVertex1);
 		tasks.put(jobVertexID2, jobVertex2);
 
-		coord.restoreLatestCheckpointedState(tasks, true, true);
+		coord.restoreLatestCheckpointedState(tasks, true, false);
 
 		// verify the restored state
 		verifiyStateRestore(jobVertexID1, jobVertex1, keyGroupPartitions1);
@@ -1984,7 +1984,7 @@ public class CheckpointCoordinatorTest {
 		tasks.put(jobVertexID1, newJobVertex1);
 		tasks.put(jobVertexID2, newJobVertex2);
 
-		coord.restoreLatestCheckpointedState(tasks, true, true);
+		coord.restoreLatestCheckpointedState(tasks, true, false);
 
 		fail("The restoration should have failed because the max parallelism changed.");
 	}
@@ -2106,7 +2106,7 @@ public class CheckpointCoordinatorTest {
 		tasks.put(jobVertexID1, newJobVertex1);
 		tasks.put(jobVertexID2, newJobVertex2);
 
-		coord.restoreLatestCheckpointedState(tasks, true, true);
+		coord.restoreLatestCheckpointedState(tasks, true, false);
 
 		fail("The restoration should have failed because the parallelism of an vertex with " +
 			"non-partitioned state changed.");
@@ -2249,7 +2249,7 @@ public class CheckpointCoordinatorTest {
 
 		tasks.put(jobVertexID1, newJobVertex1);
 		tasks.put(jobVertexID2, newJobVertex2);
-		coord.restoreLatestCheckpointedState(tasks, true, true);
+		coord.restoreLatestCheckpointedState(tasks, true, false);
 
 		// verify the restored state
 		verifiyStateRestore(jobVertexID1, newJobVertex1, keyGroupPartitions1);

http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 359262f..6e5279b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -59,6 +59,9 @@ import static org.mockito.Mockito.when;
  */
 public class CheckpointStateRestoreTest {
 
+	/**
+	 * Tests that on restore the task state is reset for each stateful task.
+	 */
 	@Test
 	public void testSetState() {
 		try {
@@ -93,7 +96,6 @@ public class CheckpointStateRestoreTest {
 			map.put(statefulId, stateful);
 			map.put(statelessId, stateless);
 
-
 			CheckpointCoordinator coord = new CheckpointCoordinator(
 					jid,
 					200000L,
@@ -167,92 +169,6 @@ public class CheckpointStateRestoreTest {
 	}
 
 	@Test
-	public void testStateOnlyPartiallyAvailable() {
-		try {
-			final ChainedStateHandle<StreamStateHandle> serializedState = CheckpointCoordinatorTest.generateChainedStateHandle(new SerializableObject());
-			KeyGroupRange keyGroupRange = KeyGroupRange.of(0,0);
-			List<SerializableObject> testStates = Collections.singletonList(new SerializableObject());
-			final KeyGroupsStateHandle serializedKeyGroupStates = CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, testStates);
-
-			final JobID jid = new JobID();
-			final JobVertexID statefulId = new JobVertexID();
-			final JobVertexID statelessId = new JobVertexID();
-
-			Execution statefulExec1 = mockExecution();
-			Execution statefulExec2 = mockExecution();
-			Execution statefulExec3 = mockExecution();
-			Execution statelessExec1 = mockExecution();
-			Execution statelessExec2 = mockExecution();
-
-			ExecutionVertex stateful1 = mockExecutionVertex(statefulExec1, statefulId, 0, 3);
-			ExecutionVertex stateful2 = mockExecutionVertex(statefulExec2, statefulId, 1, 3);
-			ExecutionVertex stateful3 = mockExecutionVertex(statefulExec3, statefulId, 2, 3);
-			ExecutionVertex stateless1 = mockExecutionVertex(statelessExec1, statelessId, 0, 2);
-			ExecutionVertex stateless2 = mockExecutionVertex(statelessExec2, statelessId, 1, 2);
-
-			ExecutionJobVertex stateful = mockExecutionJobVertex(statefulId,
-					new ExecutionVertex[] { stateful1, stateful2, stateful3 });
-			ExecutionJobVertex stateless = mockExecutionJobVertex(statelessId,
-					new ExecutionVertex[] { stateless1, stateless2 });
-
-			Map<JobVertexID, ExecutionJobVertex> map = new HashMap<JobVertexID, ExecutionJobVertex>();
-			map.put(statefulId, stateful);
-			map.put(statelessId, stateless);
-
-
-			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid,
-					200000L,
-					200000L,
-					0,
-					Integer.MAX_VALUE,
-					ExternalizedCheckpointSettings.none(),
-					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
-					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
-					new ExecutionVertex[0],
-					new StandaloneCheckpointIDCounter(),
-					new StandaloneCompletedCheckpointStore(1),
-					null,
-					new DisabledCheckpointStatsTracker());
-
-			// create ourselves a checkpoint with state
-			final long timestamp = 34623786L;
-			coord.triggerCheckpoint(timestamp, false);
-
-			PendingCheckpoint pending = coord.getPendingCheckpoints().values().iterator().next();
-			final long checkpointId = pending.getCheckpointId();
-
-			// the difference to the test "testSetState" is that one stateful subtask does not report state
-			SubtaskState checkpointStateHandles =
-					new SubtaskState(serializedState, null, null, serializedKeyGroupStates, null, 0L);
-
-			CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
-
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointMetaData, checkpointStateHandles));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointMetaData));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointMetaData, checkpointStateHandles));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointMetaData));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointMetaData));
-
-			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
-			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-
-			// let the coordinator inject the state
-			try {
-				coord.restoreLatestCheckpointedState(map, true, true);
-				fail("this should fail with an exception");
-			}
-			catch (IllegalStateException e) {
-				// swish!
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
 	public void testNoCheckpointAvailable() {
 		try {
 			CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -284,6 +200,93 @@ public class CheckpointStateRestoreTest {
 		}
 	}
 
+	/**
+	 * Tests that the allow non restored state flag is correctly handled.
+	 *
+	 * The flag only applies for state that is part of the checkpoint.
+	 */
+	@Test
+	public void testNonRestoredState() throws Exception {
+		// --- (1) Create tasks to restore checkpoint with ---
+		JobVertexID jobVertexId1 = new JobVertexID();
+		JobVertexID jobVertexId2 = new JobVertexID();
+
+		// 1st JobVertex
+		ExecutionVertex vertex11 = mockExecutionVertex(mockExecution(), jobVertexId1, 0, 3);
+		ExecutionVertex vertex12 = mockExecutionVertex(mockExecution(), jobVertexId1, 1, 3);
+		ExecutionVertex vertex13 = mockExecutionVertex(mockExecution(), jobVertexId1, 2, 3);
+		// 2nd JobVertex
+		ExecutionVertex vertex21 = mockExecutionVertex(mockExecution(), jobVertexId2, 0, 2);
+		ExecutionVertex vertex22 = mockExecutionVertex(mockExecution(), jobVertexId2, 1, 2);
+
+		ExecutionJobVertex jobVertex1 = mockExecutionJobVertex(jobVertexId1, new ExecutionVertex[] { vertex11, vertex12, vertex13 });
+		ExecutionJobVertex jobVertex2 = mockExecutionJobVertex(jobVertexId2, new ExecutionVertex[] { vertex21, vertex22 });
+
+		Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
+		tasks.put(jobVertexId1, jobVertex1);
+		tasks.put(jobVertexId2, jobVertex2);
+
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+				new JobID(),
+				Integer.MAX_VALUE,
+				Integer.MAX_VALUE,
+				0,
+				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] {},
+				new ExecutionVertex[] {},
+				new ExecutionVertex[] {},
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				null,
+				new DisabledCheckpointStatsTracker());
+
+		ChainedStateHandle<StreamStateHandle> serializedState = CheckpointCoordinatorTest
+				.generateChainedStateHandle(new SerializableObject());
+
+		// --- (2) Checkpoint misses state for a jobVertex (should work) ---
+		Map<JobVertexID, TaskState> checkpointTaskStates = new HashMap<>();
+		{
+			TaskState taskState = new TaskState(jobVertexId1, 3, 3, 1);
+			taskState.putState(0, new SubtaskState(serializedState, null, null, null, null));
+			taskState.putState(1, new SubtaskState(serializedState, null, null, null, null));
+			taskState.putState(2, new SubtaskState(serializedState, null, null, null, null));
+
+			checkpointTaskStates.put(jobVertexId1, taskState);
+		}
+		CompletedCheckpoint checkpoint = new CompletedCheckpoint(new JobID(), 0, 1, 2, new HashMap<>(checkpointTaskStates));
+
+		coord.getCheckpointStore().addCheckpoint(checkpoint);
+
+		coord.restoreLatestCheckpointedState(tasks, true, false);
+		coord.restoreLatestCheckpointedState(tasks, true, true);
+
+		// --- (3) JobVertex missing for task state that is part of the checkpoint ---
+		JobVertexID newJobVertexID = new JobVertexID();
+
+		// There is no task for this
+		{
+			TaskState taskState = new TaskState(jobVertexId1, 1, 1, 1);
+			taskState.putState(0, new SubtaskState(serializedState, null, null, null, null));
+
+			checkpointTaskStates.put(newJobVertexID, taskState);
+		}
+
+		checkpoint = new CompletedCheckpoint(new JobID(), 1, 2, 3, new HashMap<>(checkpointTaskStates));
+
+		coord.getCheckpointStore().addCheckpoint(checkpoint);
+
+		// (i) Allow non restored state (should succeed)
+		coord.restoreLatestCheckpointedState(tasks, true, true);
+
+		// (ii) Don't allow non restored state (should fail)
+		try {
+			coord.restoreLatestCheckpointedState(tasks, true, false);
+			fail("Did not throw the expected Exception.");
+		} catch (IllegalStateException ignored) {
+		}
+	}
+
 	// ------------------------------------------------------------------------
 
 	private Execution mockExecution() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index b594f4e..e1b83f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -78,7 +78,7 @@ public class SavepointLoaderTest {
 		tasks.put(vertexId, vertex);
 
 		// 1) Load and validate: everything correct
-		CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path);
+		CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, false);
 
 		assertEquals(jobId, loaded.getJobId());
 		assertEquals(checkpointId, loaded.getCheckpointID());
@@ -87,20 +87,23 @@ public class SavepointLoaderTest {
 		when(vertex.getMaxParallelism()).thenReturn(222);
 
 		try {
-			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path);
+			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, false);
 			fail("Did not throw expected Exception");
 		} catch (IllegalStateException expected) {
 			assertTrue(expected.getMessage().contains("Max parallelism mismatch"));
 		}
 
-		// 3) Load and validate: missing vertex (this should be relaxed)
+		// 3) Load and validate: missing vertex
 		assertNotNull(tasks.remove(vertexId));
 
 		try {
-			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path);
+			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, false);
 			fail("Did not throw expected Exception");
 		} catch (IllegalStateException expected) {
-			assertTrue(expected.getMessage().contains("Cannot map old state"));
+			assertTrue(expected.getMessage().contains("allowNonRestoredState"));
 		}
+
+		// 4) Load and validate: ignore missing vertex
+		SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, true);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index b5e5d45..ff604f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -45,12 +45,14 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
@@ -60,6 +62,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess;
 import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
+import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
@@ -786,7 +789,7 @@ public class JobManagerTest {
 			Future<Object> future = jobManager.ask(msg, timeout);
 			Object result = Await.result(future, timeout);
 
-			assertTrue("Did not trigger savepoint", result instanceof JobManagerMessages.TriggerSavepointSuccess);
+			assertTrue("Did not trigger savepoint", result instanceof TriggerSavepointSuccess);
 			assertEquals(1, targetDirectory.listFiles().length);
 		} finally {
 			if (actorSystem != null) {
@@ -806,4 +809,157 @@ public class JobManagerTest {
 			}
 		}
 	}
+
+	/**
+	 * Tests that configured {@link SavepointRestoreSettings} are respected.
+	 */
+	@Test
+	public void testSavepointRestoreSettings() throws Exception {
+		FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
+
+		ActorSystem actorSystem = null;
+		ActorGateway jobManager = null;
+		ActorGateway archiver = null;
+		ActorGateway taskManager = null;
+		try {
+			actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+
+			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
+					new Configuration(),
+					actorSystem,
+					Option.apply("jm"),
+					Option.apply("arch"),
+					TestingJobManager.class,
+					TestingMemoryArchivist.class);
+
+			jobManager = new AkkaActorGateway(master._1(), null);
+			archiver = new AkkaActorGateway(master._2(), null);
+
+			Configuration tmConfig = new Configuration();
+			tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
+
+			ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
+					tmConfig,
+					ResourceID.generate(),
+					actorSystem,
+					"localhost",
+					Option.apply("tm"),
+					Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path())),
+					true,
+					TestingTaskManager.class);
+
+			taskManager = new AkkaActorGateway(taskManagerRef, null);
+
+			// Wait until connected
+			Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
+			Await.ready(taskManager.ask(msg, timeout), timeout);
+
+			// Create job graph
+			JobVertex sourceVertex = new JobVertex("Source");
+			sourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
+			sourceVertex.setParallelism(1);
+
+			JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
+
+			JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings(
+					Collections.singletonList(sourceVertex.getID()),
+					Collections.singletonList(sourceVertex.getID()),
+					Collections.singletonList(sourceVertex.getID()),
+					Long.MAX_VALUE, // deactivated checkpointing
+					360000,
+					0,
+					Integer.MAX_VALUE,
+					ExternalizedCheckpointSettings.none());
+
+			jobGraph.setSnapshotSettings(snapshottingSettings);
+
+			// Submit job graph
+			msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
+			Await.result(jobManager.ask(msg, timeout), timeout);
+
+			// Wait for all tasks to be running
+			msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
+			Await.result(jobManager.ask(msg, timeout), timeout);
+
+			// Trigger savepoint
+			File targetDirectory = tmpFolder.newFolder();
+			msg = new TriggerSavepoint(jobGraph.getJobID(), Option.apply(targetDirectory.getAbsolutePath()));
+			Future<Object> future = jobManager.ask(msg, timeout);
+			Object result = Await.result(future, timeout);
+
+			String savepointPath = ((TriggerSavepointSuccess) result).savepointPath();
+
+			// Cancel because of restarts
+			msg = new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID());
+			Future<?> removedFuture = jobManager.ask(msg, timeout);
+
+			Future<?> cancelFuture = jobManager.ask(new CancelJob(jobGraph.getJobID()), timeout);
+			Object response = Await.result(cancelFuture, timeout);
+			assertTrue("Unexpected response: " + response, response instanceof CancellationSuccess);
+
+			Await.ready(removedFuture, timeout);
+
+			// Adjust the job (we need a new operator ID)
+			JobVertex newSourceVertex = new JobVertex("NewSource");
+			newSourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
+			newSourceVertex.setParallelism(1);
+
+			JobGraph newJobGraph = new JobGraph("NewTestingJob", newSourceVertex);
+
+			JobSnapshottingSettings newSnapshottingSettings = new JobSnapshottingSettings(
+					Collections.singletonList(newSourceVertex.getID()),
+					Collections.singletonList(newSourceVertex.getID()),
+					Collections.singletonList(newSourceVertex.getID()),
+					Long.MAX_VALUE, // deactivated checkpointing
+					360000,
+					0,
+					Integer.MAX_VALUE,
+					ExternalizedCheckpointSettings.none());
+
+			newJobGraph.setSnapshotSettings(newSnapshottingSettings);
+
+			SavepointRestoreSettings restoreSettings = SavepointRestoreSettings.forPath(savepointPath, false);
+			newJobGraph.setSavepointRestoreSettings(restoreSettings);
+
+			msg = new JobManagerMessages.SubmitJob(newJobGraph, ListeningBehaviour.DETACHED);
+			response = Await.result(jobManager.ask(msg, timeout), timeout);
+
+			assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.JobResultFailure);
+
+			JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) response;
+			Throwable cause = failure.cause().deserializeError(ClassLoader.getSystemClassLoader());
+
+			assertTrue(cause instanceof IllegalStateException);
+			assertTrue(cause.getMessage().contains("allowNonRestoredState"));
+
+			// Wait until removed
+			msg = new TestingJobManagerMessages.NotifyWhenJobRemoved(newJobGraph.getJobID());
+			Await.ready(jobManager.ask(msg, timeout), timeout);
+
+			// Resubmit, but allow non restored state now
+			restoreSettings = SavepointRestoreSettings.forPath(savepointPath, true);
+			newJobGraph.setSavepointRestoreSettings(restoreSettings);
+
+			msg = new JobManagerMessages.SubmitJob(newJobGraph, ListeningBehaviour.DETACHED);
+			response = Await.result(jobManager.ask(msg, timeout), timeout);
+
+			assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.JobSubmitSuccess);
+		} finally {
+			if (actorSystem != null) {
+				actorSystem.shutdown();
+			}
+
+			if (archiver != null) {
+				archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+
+			if (jobManager != null) {
+				jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+
+			if (taskManager != null) {
+				taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index da25ae6..5a64173 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -32,6 +32,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -192,7 +193,7 @@ public class RescalingITCase extends TestLogger {
 
 			JobGraph scaledJobGraph = createJobGraphWithKeyedState(parallelism2, maxParallelism, numberKeys, numberElements2, true, 100);
 
-			scaledJobGraph.setSavepointPath(savepointPath);
+			scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
 			jobID = scaledJobGraph.getJobID();
 
@@ -284,7 +285,7 @@ public class RescalingITCase extends TestLogger {
 
 			JobGraph scaledJobGraph = createJobGraphWithOperatorState(parallelism2, maxParallelism, OperatorCheckpointMethod.NON_PARTITIONED);
 
-			scaledJobGraph.setSavepointPath(savepointPath);
+			scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
 			jobID = scaledJobGraph.getJobID();
 
@@ -397,7 +398,7 @@ public class RescalingITCase extends TestLogger {
 				true,
 				100);
 
-			scaledJobGraph.setSavepointPath(savepointPath);
+			scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
 			jobID = scaledJobGraph.getJobID();
 
@@ -523,7 +524,7 @@ public class RescalingITCase extends TestLogger {
 
 			JobGraph scaledJobGraph = createJobGraphWithOperatorState(parallelism2, maxParallelism, checkpointMethod);
 
-			scaledJobGraph.setSavepointPath(savepointPath);
+			scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
 			jobID = scaledJobGraph.getJobID();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c0e620f0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index d794953..87cf80f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
@@ -283,7 +284,7 @@ public class SavepointITCase extends TestLogger {
 							}
 
 							// Set the savepoint path
-							jobGraph.setSavepointPath(savepointPath);
+							jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
 							LOG.info("Resubmitting job " + jobGraph.getJobID() + " with " +
 									"savepoint path " + savepointPath + " in detached mode.");
@@ -452,8 +453,8 @@ public class SavepointITCase extends TestLogger {
 			final JobGraph jobGraph = createJobGraph(parallelism, numberOfRetries, 3600000, 1000);
 
 			// Set non-existing savepoint path
-			jobGraph.setSavepointPath("unknown path");
-			assertEquals("unknown path", jobGraph.getSnapshotSettings().getSavepointPath());
+			jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("unknown path"));
+			assertEquals("unknown path", jobGraph.getSavepointRestoreSettings().getRestorePath());
 
 			LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");