You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/11/02 07:10:23 UTC

[2/2] flink git commit: [FLINK-4445] [checkpointing] Add option to allow non restored savepoint state

[FLINK-4445] [checkpointing] Add option to allow non restored savepoint state

Allows to skip savepoint state that cannot be mapped to a job vertex when
restoring.

This closes #2713.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da32af12
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da32af12
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da32af12

Branch: refs/heads/release-1.1
Commit: da32af1236c321a55e9b877524b04123624f1d69
Parents: 1f91261
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Oct 27 09:49:01 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 2 08:08:45 2016 +0100

----------------------------------------------------------------------
 .../savepoint/SavepointCoordinator.java         |  20 ++-
 .../runtime/executiongraph/ExecutionGraph.java  |  15 +-
 .../jobgraph/SavepointRestoreSettings.java      |  34 ++---
 .../flink/runtime/jobmanager/JobManager.scala   |   4 +-
 .../SavepointCoordinatorRestoreTest.java        | 150 ++++++++++++++++++
 .../savepoint/SavepointCoordinatorTest.java     |   8 +-
 .../runtime/jobmanager/JobManagerTest.java      | 153 +++++++++++++++++++
 7 files changed, 347 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/da32af12/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
index cd9eb0c..2cb8636 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
@@ -181,13 +181,16 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 	 *
 	 * @param tasks         Tasks that will possibly be reset
 	 * @param savepointPath The path of the savepoint to rollback to
+	 * @param allowNonRestoredState Allow to skip checkpoint state that cannot be mapped
+	 * to any job vertex in tasks.
 	 * @throws IllegalStateException If coordinator is shut down
 	 * @throws IllegalStateException If mismatch between program and savepoint state
 	 * @throws Exception             If savepoint store failure
 	 */
 	public void restoreSavepoint(
 			Map<JobVertexID, ExecutionJobVertex> tasks,
-			String savepointPath) throws Exception {
+			String savepointPath,
+			boolean allowNonRestoredState) throws Exception {
 
 		checkNotNull(savepointPath, "Savepoint path");
 
@@ -208,8 +211,8 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 						String msg = String.format("Failed to rollback to savepoint %s. " +
 										"Parallelism mismatch between savepoint state and new program. " +
 										"Cannot map operator %s with parallelism %d to new program with " +
-										"parallelism %d. This indicates that the program has been changed " +
-										"in a non-compatible way after the savepoint.",
+										"parallelism %d. You cannot change parallelism of " +
+										"Flink 1.1 programs.",
 								savepoint,
 								taskState.getJobVertexID(),
 								taskState.getParallelism(),
@@ -239,12 +242,15 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 
 						currentExecutionAttempt.setInitialState(state, kvStateForTaskMap);
 					}
+				} else if (allowNonRestoredState) {
+					LOG.info("Ignoring checkpoint state for operator {}.", taskState.getJobVertexID());
 				} else {
 					String msg = String.format("Failed to rollback to savepoint %s. " +
-									"Cannot map old state for task %s to the new program. " +
-									"This indicates that the program has been changed in a " +
-									"non-compatible way  after the savepoint.", savepointPath,
-							taskState.getJobVertexID());
+									"Cannot map savepoint state for operator %s to the new program, " +
+									"because the operator is not available in the new program. If " +
+									"you want to allow this, you can set the --allowNonRestoredState " +
+									"option on the CLI.",
+							savepointPath, taskState.getJobVertexID());
 					throw new IllegalStateException(msg);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/da32af12/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index ce8e0c9..d3b48df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -953,7 +953,7 @@ public class ExecutionGraph implements Serializable {
 					if (!restored && savepointCoordinator != null) {
 						String savepointPath = savepointCoordinator.getSavepointRestorePath();
 						if (savepointPath != null) {
-							savepointCoordinator.restoreSavepoint(getAllVertices(), savepointPath);
+							savepointCoordinator.restoreSavepoint(getAllVertices(), savepointPath, false);
 						}
 					}
 				}
@@ -990,21 +990,20 @@ public class ExecutionGraph implements Serializable {
 	 * actor.
 	 *
 	 * @param savepointPath The path of the savepoint to rollback to.
+	 * @param allowNonRestoredState Allow to skip checkpoint state that cannot be mapped
+	 * to any job vertex in tasks.
 	 * @throws IllegalStateException If checkpointing is disabled
 	 * @throws IllegalStateException If checkpoint coordinator is shut down
 	 * @throws Exception If failure during rollback
 	 */
-	public void restoreSavepoint(String savepointPath) throws Exception {
+	public void restoreSavepoint(String savepointPath, boolean allowNonRestoredState) throws Exception {
 		synchronized (progressLock) {
 			if (savepointCoordinator != null) {
 				LOG.info("Restoring savepoint: " + savepointPath + ".");
-
 				savepointCoordinator.restoreSavepoint(
-						getAllVertices(), savepointPath);
-			}
-			else {
-				// Sanity check
-				throw new IllegalStateException("Checkpointing disabled.");
+						getAllVertices(), savepointPath, allowNonRestoredState);
+			} else {
+				throw new IllegalStateException("Savepoints disabled.");
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/da32af12/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
index 48d3997..10119bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
@@ -33,26 +33,26 @@ public class SavepointRestoreSettings implements Serializable {
 	private final static SavepointRestoreSettings NONE = new SavepointRestoreSettings(null, false);
 
 	/** By default, be strict when restoring from a savepoint.  */
-	private final static boolean DEFAULT_IGNORE_UNMAPPED_STATE = false;
+	private final static boolean DEFAULT_ALLOW_NON_RESTORED_STATE = false;
 
 	/** Savepoint restore path. */
 	private final String restorePath;
 
 	/**
-	 * Flag indicating whether the restore should ignore if the savepoint contains
-	 * state for an operator that is not part of the job.
+	 * Flag indicating whether non restored state is allowed if the savepoint
+	 * contains state for an operator that is not part of the job.
 	 */
-	private final boolean ignoreUnmappedState;
+	private final boolean allowNonRestoredState;
 
 	/**
 	 * Creates the restore settings.
 	 *
 	 * @param restorePath Savepoint restore path.
-	 * @param ignoreUnmappedState Ignore unmapped state.
+	 * @param allowNonRestoredState Ignore unmapped state.
 	 */
-	private SavepointRestoreSettings(String restorePath, boolean ignoreUnmappedState) {
+	private SavepointRestoreSettings(String restorePath, boolean allowNonRestoredState) {
 		this.restorePath = restorePath;
-		this.ignoreUnmappedState = ignoreUnmappedState;
+		this.allowNonRestoredState = allowNonRestoredState;
 	}
 
 	/**
@@ -73,14 +73,14 @@ public class SavepointRestoreSettings implements Serializable {
 	}
 
 	/**
-	 * Returns whether the restore should ignore whether the savepoint contains
-	 * state that cannot be mapped to the job.
+	 * Returns whether non restored state is allowed if the savepoint contains
+	 * state that cannot be mapped back to the job.
 	 *
-	 * @return <code>true</code> if restore should ignore whether the savepoint contains
-	 * state that cannot be mapped to the job.
+	 * @return <code>true</code> if non restored state is allowed if the savepoint
+	 * contains state that cannot be mapped  back to the job.
 	 */
-	public boolean ignoreUnmappedState() {
-		return ignoreUnmappedState;
+	public boolean allowNonRestoredState() {
+		return allowNonRestoredState;
 	}
 
 	@Override
@@ -88,7 +88,7 @@ public class SavepointRestoreSettings implements Serializable {
 		if (restoreSavepoint()) {
 			return "SavepointRestoreSettings.forPath(" +
 					"restorePath='" + restorePath + '\'' +
-					", ignoreUnmappedState=" + ignoreUnmappedState +
+					", allowNonRestoredState=" + allowNonRestoredState +
 					')';
 		} else {
 			return "SavepointRestoreSettings.none()";
@@ -102,12 +102,12 @@ public class SavepointRestoreSettings implements Serializable {
 	}
 
 	public static SavepointRestoreSettings forPath(String savepointPath) {
-		return forPath(savepointPath, DEFAULT_IGNORE_UNMAPPED_STATE);
+		return forPath(savepointPath, DEFAULT_ALLOW_NON_RESTORED_STATE);
 	}
 
-	public static SavepointRestoreSettings forPath(String savepointPath, boolean ignoreUnmappedState) {
+	public static SavepointRestoreSettings forPath(String savepointPath, boolean allowNonRestoredState) {
 		checkNotNull(savepointPath, "Savepoint restore path.");
-		return new SavepointRestoreSettings(savepointPath, ignoreUnmappedState);
+		return new SavepointRestoreSettings(savepointPath, allowNonRestoredState);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/da32af12/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a28c25f..e5b5267 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1299,7 +1299,9 @@ class JobManager(
               val savepointPath = savepointSettings.getRestorePath()
 
               try {
-                executionGraph.restoreSavepoint(savepointPath)
+                executionGraph.restoreSavepoint(
+                  savepointPath,
+                  savepointSettings.allowNonRestoredState())
               } catch {
                 case e: Exception =>
                   jobInfo.client ! decorateMessage(JobResultFailure(new SerializedThrowable(e)))

http://git-wip-us.apache.org/repos/asf/flink/blob/da32af12/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorRestoreTest.java
new file mode 100644
index 0000000..cd03403
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorRestoreTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests concerning the restoring of state from a savepoint to the task executions.
+ */
+public class SavepointCoordinatorRestoreTest {
+
+	/**
+	 * Tests that the unmapped state flag is correctly handled.
+	 *
+	 * The flag should only apply for state that is part of the checkpoint.
+	 */
+	@Test
+	public void testRestoreUnmappedCheckpointState() throws Exception {
+		// --- (1) Create tasks to restore checkpoint with ---
+		JobVertexID jobVertexId1 = new JobVertexID();
+		JobVertexID jobVertexId2 = new JobVertexID();
+
+		// 1st JobVertex
+		ExecutionVertex vertex11 = mockExecutionVertex(mockExecution(), jobVertexId1, 0, 3);
+		ExecutionVertex vertex12 = mockExecutionVertex(mockExecution(), jobVertexId1, 1, 3);
+		ExecutionVertex vertex13 = mockExecutionVertex(mockExecution(), jobVertexId1, 2, 3);
+		// 2nd JobVertex
+		ExecutionVertex vertex21 = mockExecutionVertex(mockExecution(), jobVertexId2, 0, 2);
+		ExecutionVertex vertex22 = mockExecutionVertex(mockExecution(), jobVertexId2, 1, 2);
+
+		ExecutionJobVertex jobVertex1 = mockExecutionJobVertex(jobVertexId1, new ExecutionVertex[] { vertex11, vertex12, vertex13 });
+		ExecutionJobVertex jobVertex2 = mockExecutionJobVertex(jobVertexId2, new ExecutionVertex[] { vertex21, vertex22 });
+
+		Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
+		tasks.put(jobVertexId1, jobVertex1);
+		tasks.put(jobVertexId2, jobVertex2);
+
+		SavepointStore store = new HeapSavepointStore();
+
+		SavepointCoordinator coord = new SavepointCoordinator(
+				new JobID(),
+				Integer.MAX_VALUE,
+				Integer.MAX_VALUE,
+				0,
+				new ExecutionVertex[] {},
+				new ExecutionVertex[] {},
+				new ExecutionVertex[] {},
+				getClass().getClassLoader(),
+				new StandaloneCheckpointIDCounter(),
+				store,
+				new DisabledCheckpointStatsTracker());
+
+		// --- (2) Checkpoint misses state for a jobVertex (should work) ---
+		Map<JobVertexID, TaskState> checkpointTaskStates = new HashMap<>();
+		checkpointTaskStates.put(jobVertexId1, new TaskState(jobVertexId1, 3));
+
+		CompletedCheckpoint checkpoint = new CompletedCheckpoint(new JobID(), 0, 1, 2, new HashMap<>(checkpointTaskStates));
+
+		Savepoint savepoint = new SavepointV0(checkpoint.getCheckpointID(), checkpointTaskStates.values());
+		String savepointPath = store.storeSavepoint(savepoint);
+
+		coord.restoreSavepoint(tasks, savepointPath, false);
+		coord.restoreSavepoint(tasks, savepointPath, true);
+
+		// --- (3) JobVertex missing for task state that is part of the checkpoint ---
+		JobVertexID newJobVertexID = new JobVertexID();
+
+		// There is no task for this
+		checkpointTaskStates.put(newJobVertexID, new TaskState(newJobVertexID, 1));
+
+		checkpoint = new CompletedCheckpoint(new JobID(), 1, 2, 3, new HashMap<>(checkpointTaskStates));
+		savepoint = new SavepointV0(checkpoint.getCheckpointID(), checkpointTaskStates.values());
+		savepointPath = store.storeSavepoint(savepoint);
+
+		// (i) Ignore unmapped state (should succeed)
+		coord.restoreSavepoint(tasks, savepointPath, true);
+
+		// (ii) Don't ignore unmapped state (should fail)
+		try {
+			coord.restoreSavepoint(tasks, savepointPath, false);
+			fail("Did not throw the expected Exception.");
+		} catch (IllegalStateException ignored) {
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private Execution mockExecution() {
+		return mockExecution(ExecutionState.RUNNING);
+	}
+
+	private Execution mockExecution(ExecutionState state) {
+		Execution mock = Mockito.mock(Execution.class);
+		when(mock.getAttemptId()).thenReturn(new ExecutionAttemptID());
+		when(mock.getState()).thenReturn(state);
+		return mock;
+	}
+
+	private ExecutionVertex mockExecutionVertex(Execution execution, JobVertexID vertexId, int subtask, int parallelism) {
+		ExecutionVertex mock = Mockito.mock(ExecutionVertex.class);
+		when(mock.getJobvertexId()).thenReturn(vertexId);
+		when(mock.getParallelSubtaskIndex()).thenReturn(subtask);
+		when(mock.getCurrentExecutionAttempt()).thenReturn(execution);
+		when(mock.getTotalNumberOfParallelSubtasks()).thenReturn(parallelism);
+		return mock;
+	}
+
+	private ExecutionJobVertex mockExecutionJobVertex(JobVertexID id, ExecutionVertex[] vertices) {
+		ExecutionJobVertex vertex = Mockito.mock(ExecutionJobVertex.class);
+		when(vertex.getParallelism()).thenReturn(vertices.length);
+		when(vertex.getJobVertexId()).thenReturn(id);
+		when(vertex.getTaskVertices()).thenReturn(vertices);
+		return vertex;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/da32af12/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
index 6b4f354..b1b384d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
@@ -271,7 +271,7 @@ public class SavepointCoordinatorTest extends TestLogger {
 		assertNotNull(savepointPath);
 
 		// Rollback
-		coordinator.restoreSavepoint(createExecutionJobVertexMap(jobVertices), savepointPath);
+		coordinator.restoreSavepoint(createExecutionJobVertexMap(jobVertices), savepointPath, false);
 
 		// Verify all executions have been reset
 		for (ExecutionVertex vertex : ackVertices) {
@@ -339,7 +339,7 @@ public class SavepointCoordinatorTest extends TestLogger {
 			// Rollback
 			coordinator.restoreSavepoint(
 					createExecutionJobVertexMap(jobVertices),
-					savepointPath);
+					savepointPath, false);
 			fail("Did not throw expected Exception after rollback with parallelism mismatch.");
 		}
 		catch (Exception ignored) {
@@ -385,7 +385,7 @@ public class SavepointCoordinatorTest extends TestLogger {
 			// Rollback
 			coordinator.restoreSavepoint(
 					createExecutionJobVertexMap(jobVertex),
-					savepointPath);
+					savepointPath, false);
 
 			fail("Did not throw expected Exception after rollback with savepoint store failure.");
 		}
@@ -416,7 +416,7 @@ public class SavepointCoordinatorTest extends TestLogger {
 				checkpointIdCounter,
 				savepointStore);
 
-		coordinator.restoreSavepoint(createExecutionJobVertexMap(), "any");
+		coordinator.restoreSavepoint(createExecutionJobVertexMap(), "any", false);
 
 		verify(checkpointIdCounter).setCount(eq(12312312L + 1));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/da32af12/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 5515c00..d60e060 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -24,6 +24,7 @@ import akka.actor.PoisonPill;
 import akka.testkit.JavaTestKit;
 import com.typesafe.config.Config;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
@@ -41,6 +42,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
@@ -464,4 +466,155 @@ public class JobManagerTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that configured {@link SavepointRestoreSettings} are respected.
+	 */
+	@Test
+	public void testSavepointRestoreSettings() throws Exception {
+		FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
+
+		ActorSystem actorSystem = null;
+		ActorGateway jobManager = null;
+		ActorGateway archiver = null;
+		ActorGateway taskManager = null;
+		try {
+			actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+
+			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
+					new Configuration(),
+					actorSystem,
+					Option.apply("jm"),
+					Option.apply("arch"),
+					TestingJobManager.class,
+					TestingMemoryArchivist.class);
+
+			jobManager = new AkkaActorGateway(master._1(), null);
+			archiver = new AkkaActorGateway(master._2(), null);
+
+			Configuration tmConfig = new Configuration();
+			tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
+
+			ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
+					tmConfig,
+					ResourceID.generate(),
+					actorSystem,
+					"localhost",
+					Option.apply("tm"),
+					Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path())),
+					true,
+					TestingTaskManager.class);
+
+			taskManager = new AkkaActorGateway(taskManagerRef, null);
+
+			// Wait until connected
+			Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
+			Await.ready(taskManager.ask(msg, timeout), timeout);
+
+			// Create job graph
+			JobVertex sourceVertex = new JobVertex("Source");
+			sourceVertex.setInvokableClass(JobManagerHARecoveryTest.BlockingStatefulInvokable.class);
+			sourceVertex.setParallelism(1);
+
+			JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
+
+			JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings(
+					Collections.singletonList(sourceVertex.getID()),
+					Collections.singletonList(sourceVertex.getID()),
+					Collections.singletonList(sourceVertex.getID()),
+					Long.MAX_VALUE, // deactivated checkpointing
+					360000,
+					0,
+					Integer.MAX_VALUE);
+
+			jobGraph.setSnapshotSettings(snapshottingSettings);
+
+			// Submit job graph
+			msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
+			Await.result(jobManager.ask(msg, timeout), timeout);
+
+			// Wait for all tasks to be running
+			msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
+			Await.result(jobManager.ask(msg, timeout), timeout);
+
+			// Trigger savepoint
+			msg = new JobManagerMessages.TriggerSavepoint(jobGraph.getJobID());
+			Future<Object> future = jobManager.ask(msg, timeout);
+			Object result = Await.result(future, timeout);
+
+			String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) result).savepointPath();
+
+			// Cancel because of restarts
+			msg = new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID());
+			Future<?> removedFuture = jobManager.ask(msg, timeout);
+
+			Future<?> cancelFuture = jobManager.ask(new JobManagerMessages.CancelJob(jobGraph.getJobID()), timeout);
+			Object response = Await.result(cancelFuture, timeout);
+			assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.CancellationSuccess);
+
+			Await.ready(removedFuture, timeout);
+
+			// Adjust the job (we need a new operator ID)
+			JobVertex newSourceVertex = new JobVertex("Source");
+			newSourceVertex.setInvokableClass(JobManagerHARecoveryTest.BlockingStatefulInvokable.class);
+			newSourceVertex.setParallelism(1);
+
+			JobGraph newJobGraph = new JobGraph("TestingJob", newSourceVertex);
+
+			JobSnapshottingSettings newSnapshottingSettings = new JobSnapshottingSettings(
+					Collections.singletonList(newSourceVertex.getID()),
+					Collections.singletonList(newSourceVertex.getID()),
+					Collections.singletonList(newSourceVertex.getID()),
+					Long.MAX_VALUE, // deactivated checkpointing
+					360000,
+					0,
+					Integer.MAX_VALUE);
+
+			newJobGraph.setSnapshotSettings(newSnapshottingSettings);
+
+			SavepointRestoreSettings restoreSettings = SavepointRestoreSettings.forPath(savepointPath, false);
+			newJobGraph.setSavepointRestoreSettings(restoreSettings);
+
+			msg = new JobManagerMessages.SubmitJob(newJobGraph, ListeningBehaviour.DETACHED);
+			response = Await.result(jobManager.ask(msg, timeout), timeout);
+
+			assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.JobResultFailure);
+
+			JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) response;
+			Throwable cause = failure.cause().deserializeError(ClassLoader.getSystemClassLoader());
+
+			assertTrue(cause instanceof IllegalStateException);
+			assertTrue(cause.getMessage().contains("allowNonRestoredState"));
+
+			// Wait until removed
+			msg = new TestingJobManagerMessages.NotifyWhenJobRemoved(newJobGraph.getJobID());
+
+			Await.ready(jobManager.ask(msg, timeout), timeout);
+
+			// Resubmit, but allow non restored state now
+			restoreSettings = SavepointRestoreSettings.forPath(savepointPath, true);
+			newJobGraph.setSavepointRestoreSettings(restoreSettings);
+
+			msg = new JobManagerMessages.SubmitJob(newJobGraph, ListeningBehaviour.DETACHED);
+			response = Await.result(jobManager.ask(msg, timeout), timeout);
+
+			assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.JobSubmitSuccess);
+		} finally {
+			if (actorSystem != null) {
+				actorSystem.shutdown();
+			}
+
+			if (archiver != null) {
+				archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+
+			if (jobManager != null) {
+				jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+
+			if (taskManager != null) {
+				taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+		}
+	}
+
 }