You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2019/05/15 11:11:59 UTC

[flink] branch master updated: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1dfdaa4  [FLINK-11159] Allow configuration whether to fall back to savepoints for restore
1dfdaa4 is described below

commit 1dfdaa417ab7cdca9bef1efe6381c7eb67022aaf
Author: yanghua <ya...@gmail.com>
AuthorDate: Fri May 10 19:55:27 2019 +0800

    [FLINK-11159] Allow configuration whether to fall back to savepoints for restore
    
    Closes #8410
---
 docs/dev/stream/state/checkpointing.md             |   5 +
 docs/dev/stream/state/checkpointing.zh.md          |   5 +
 .../jobmanager/JMXJobManagerMetricTest.java        |   3 +-
 .../runtime/checkpoint/CheckpointCoordinator.java  |   8 +-
 .../checkpoint/CompletedCheckpointStore.java       |  37 ++-
 .../StandaloneCompletedCheckpointStore.java        |   5 -
 .../ZooKeeperCompletedCheckpointStore.java         |  10 -
 .../runtime/executiongraph/ExecutionGraph.java     |   6 +-
 .../executiongraph/ExecutionGraphBuilder.java      |   3 +-
 .../tasks/CheckpointCoordinatorConfiguration.java  |  16 +-
 .../CheckpointCoordinatorFailureTest.java          |   5 +-
 .../CheckpointCoordinatorMasterHooksTest.java      |   5 +-
 .../checkpoint/CheckpointCoordinatorTest.java      | 270 ++++++++++++++++++---
 .../CheckpointSettingsSerializableTest.java        |   3 +-
 .../checkpoint/CheckpointStateRestoreTest.java     |   9 +-
 .../checkpoint/CheckpointStatsTrackerTest.java     |   1 +
 .../checkpoint/CompletedCheckpointStoreTest.java   |  10 +-
 .../ExecutionGraphCheckpointCoordinatorTest.java   |   5 +-
 .../ZooKeeperCompletedCheckpointStoreITCase.java   |   8 +-
 ...oKeeperCompletedCheckpointStoreMockitoTest.java | 126 +++++++++-
 .../executiongraph/ArchivedExecutionGraphTest.java |   3 +-
 ...ncurrentFailoverStrategyExecutionGraphTest.java |   6 +-
 .../ExecutionGraphDeploymentTest.java              |   1 +
 .../runtime/executiongraph/FailoverRegionTest.java |   7 +-
 .../flink/runtime/jobgraph/JobGraphTest.java       |   3 +-
 .../tasks/JobCheckpointingSettingsTest.java        |   1 +
 .../flink/runtime/jobmaster/JobMasterTest.java     |   9 +-
 .../RecoverableCompletedCheckpointStore.java       |   5 -
 .../api/environment/CheckpointConfig.java          |  21 ++
 .../api/graph/StreamingJobGraphGenerator.java      |   3 +-
 .../jobmaster/JobMasterStopWithSavepointIT.java    |   3 +-
 .../jobmaster/JobMasterTriggerSavepointITCase.java |   3 +-
 32 files changed, 510 insertions(+), 95 deletions(-)

diff --git a/docs/dev/stream/state/checkpointing.md b/docs/dev/stream/state/checkpointing.md
index eca8d5c..fab1a49 100644
--- a/docs/dev/stream/state/checkpointing.md
+++ b/docs/dev/stream/state/checkpointing.md
@@ -76,6 +76,8 @@ Other parameters for checkpointing include:
 
   - *fail/continue task on checkpoint errors*: This determines if a task will be failed if an error occurs in the execution of the task's checkpoint procedure. This is the default behaviour. Alternatively, when this is disabled, the task will simply decline the checkpoint to the checkpoint coordinator and continue running.
 
+  - *prefer checkpoint for recovery*: This determines if a job will fallback to latest checkpoint even when there are more recent savepoints available to potentially reduce recovery time.
+
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
@@ -100,6 +102,9 @@ env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
 
 // enable externalized checkpoints which are retained after job cancellation
 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+
+// allow job recovery fallback to checkpoint when there is a more recent savepoint
+env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
diff --git a/docs/dev/stream/state/checkpointing.zh.md b/docs/dev/stream/state/checkpointing.zh.md
index eca8d5c..bcd32e0 100644
--- a/docs/dev/stream/state/checkpointing.zh.md
+++ b/docs/dev/stream/state/checkpointing.zh.md
@@ -76,6 +76,8 @@ Other parameters for checkpointing include:
 
   - *fail/continue task on checkpoint errors*: This determines if a task will be failed if an error occurs in the execution of the task's checkpoint procedure. This is the default behaviour. Alternatively, when this is disabled, the task will simply decline the checkpoint to the checkpoint coordinator and continue running.
 
+  - *prefer checkpoint for recovery*: This determines if a job will fallback to latest checkpoint even when there are more recent savepoints available to potentially reduce recovery time.
+
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
@@ -125,6 +127,9 @@ env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
 
 // allow only one checkpoint to be in progress at the same time
 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
+
+// allow job recovery fallback to checkpoint when there is a more recent savepoint
+env.getCheckpointConfig().setPreferCheckpointForRecovery(true)
 {% endhighlight %}
 </div>
 </div>
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 036796c..b946896 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -100,7 +100,8 @@ public class JMXJobManagerMetricTest extends TestLogger {
 					50,
 					5,
 					CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-					true),
+					true,
+					false),
 				null));
 
 			ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index c7f59a7..7da924c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -183,6 +183,8 @@ public class CheckpointCoordinator {
 	/** Registry that tracks state which is shared across (incremental) checkpoints. */
 	private SharedStateRegistry sharedStateRegistry;
 
+	private boolean isPreferCheckpointForRecovery;
+
 	// --------------------------------------------------------------------------------------------
 
 	public CheckpointCoordinator(
@@ -199,7 +201,8 @@ public class CheckpointCoordinator {
 			CompletedCheckpointStore completedCheckpointStore,
 			StateBackend checkpointStateBackend,
 			Executor executor,
-			SharedStateRegistryFactory sharedStateRegistryFactory) {
+			SharedStateRegistryFactory sharedStateRegistryFactory,
+			boolean isPreferCheckpointForRecovery) {
 
 		// sanity checks
 		checkNotNull(checkpointStateBackend);
@@ -233,6 +236,7 @@ public class CheckpointCoordinator {
 		this.executor = checkNotNull(executor);
 		this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
 		this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
+		this.isPreferCheckpointForRecovery = isPreferCheckpointForRecovery;
 
 		this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
 		this.masterHooks = new HashMap<>();
@@ -1080,7 +1084,7 @@ public class CheckpointCoordinator {
 			LOG.debug("Status of the shared state registry of job {} after restore: {}.", job, sharedStateRegistry);
 
 			// Restore from the latest checkpoint
-			CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
+			CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery);
 
 			if (latest == null) {
 				if (errorIfNoCheckpoint) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index 82193b5..1cda131 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -19,18 +19,25 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.ListIterator;
 
 /**
  * A bounded LIFO-queue of {@link CompletedCheckpoint} instances.
  */
 public interface CompletedCheckpointStore {
 
+	Logger LOG = LoggerFactory.getLogger(CompletedCheckpointStore.class);
+
 	/**
 	 * Recover available {@link CompletedCheckpoint} instances.
 	 *
-	 * <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest
+	 * <p>After a call to this method, {@link #getLatestCheckpoint(boolean)} returns the latest
 	 * available checkpoint.
 	 */
 	void recover() throws Exception;
@@ -47,7 +54,33 @@ public interface CompletedCheckpointStore {
 	 * Returns the latest {@link CompletedCheckpoint} instance or <code>null</code> if none was
 	 * added.
 	 */
-	CompletedCheckpoint getLatestCheckpoint() throws Exception;
+	default CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) throws Exception {
+		if (getAllCheckpoints().isEmpty()) {
+			return null;
+		}
+
+		CompletedCheckpoint candidate = getAllCheckpoints().get(getAllCheckpoints().size() - 1);
+		if (isPreferCheckpointForRecovery && getAllCheckpoints().size() > 1) {
+			List<CompletedCheckpoint> allCheckpoints;
+			try {
+				allCheckpoints = getAllCheckpoints();
+				ListIterator<CompletedCheckpoint> listIterator = allCheckpoints.listIterator(allCheckpoints.size() - 1);
+				while (listIterator.hasPrevious()) {
+					CompletedCheckpoint prev = listIterator.previous();
+					if (!prev.getProperties().isSavepoint()) {
+						candidate = prev;
+						LOG.info("Found a completed checkpoint before the latest savepoint, will use it to recover!");
+						break;
+					}
+				}
+			} catch (Exception e) {
+				LOG.error("Method getAllCheckpoints caused exception : ", e);
+				throw new FlinkRuntimeException(e);
+			}
+		}
+
+		return candidate;
+	}
 
 	/**
 	 * Shuts down the store.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 63e7468..eca7e92 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -77,11 +77,6 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
 	}
 
 	@Override
-	public CompletedCheckpoint getLatestCheckpoint() {
-		return checkpoints.isEmpty() ? null : checkpoints.getLast();
-	}
-
-	@Override
 	public List<CompletedCheckpoint> getAllCheckpoints() {
 		return new ArrayList<>(checkpoints);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 12cc8eb..e4001b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -244,16 +244,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	}
 
 	@Override
-	public CompletedCheckpoint getLatestCheckpoint() {
-		if (completedCheckpoints.isEmpty()) {
-			return null;
-		}
-		else {
-			return completedCheckpoints.peekLast();
-		}
-	}
-
-	@Override
 	public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
 		List<CompletedCheckpoint> checkpoints = new ArrayList<>(completedCheckpoints);
 		return checkpoints;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 6a4af57..a825860 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -524,7 +524,8 @@ public class ExecutionGraph implements AccessExecutionGraph {
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore checkpointStore,
 			StateBackend checkpointStateBackend,
-			CheckpointStatsTracker statsTracker) {
+			CheckpointStatsTracker statsTracker,
+			boolean isPreferCheckpointForRecovery) {
 
 		// simple sanity checks
 		checkArgument(interval >= 10, "checkpoint interval must not be below 10ms");
@@ -554,7 +555,8 @@ public class ExecutionGraph implements AccessExecutionGraph {
 			checkpointStore,
 			checkpointStateBackend,
 			ioExecutor,
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			isPreferCheckpointForRecovery);
 
 		// register the master hooks on the checkpoint coordinator
 		for (MasterTriggerRestoreHook<?> hook : masterHooks) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index fc63e4b..117b7b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -301,7 +301,8 @@ public class ExecutionGraphBuilder {
 				checkpointIdCounter,
 				completedCheckpoints,
 				rootBackend,
-				checkpointStatsTracker);
+				checkpointStatsTracker,
+				chkConfig.isPreferCheckpointForRecovery());
 		}
 
 		// create all the metrics for the Execution Graph
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
index 4ecbda5..9e57d12 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
@@ -54,13 +54,16 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
 	 */
 	private final boolean isExactlyOnce;
 
+	private final boolean isPreferCheckpointForRecovery;
+
 	public CheckpointCoordinatorConfiguration(
 			long checkpointInterval,
 			long checkpointTimeout,
 			long minPauseBetweenCheckpoints,
 			int maxConcurrentCheckpoints,
 			CheckpointRetentionPolicy checkpointRetentionPolicy,
-			boolean isExactlyOnce) {
+			boolean isExactlyOnce,
+			boolean isPerfetCheckpointForRecovery) {
 
 		// sanity checks
 		if (checkpointInterval < 1 || checkpointTimeout < 1 ||
@@ -74,6 +77,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
 		this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
 		this.checkpointRetentionPolicy = Preconditions.checkNotNull(checkpointRetentionPolicy);
 		this.isExactlyOnce = isExactlyOnce;
+		this.isPreferCheckpointForRecovery = isPerfetCheckpointForRecovery;
 	}
 
 	public long getCheckpointInterval() {
@@ -100,6 +104,10 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
 		return isExactlyOnce;
 	}
 
+	public boolean isPreferCheckpointForRecovery() {
+		return isPreferCheckpointForRecovery;
+	}
+
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
@@ -114,7 +122,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
 			minPauseBetweenCheckpoints == that.minPauseBetweenCheckpoints &&
 			maxConcurrentCheckpoints == that.maxConcurrentCheckpoints &&
 			isExactlyOnce == that.isExactlyOnce &&
-			checkpointRetentionPolicy == that.checkpointRetentionPolicy;
+			checkpointRetentionPolicy == that.checkpointRetentionPolicy &&
+			isPreferCheckpointForRecovery == that.isPreferCheckpointForRecovery;
 	}
 
 	@Override
@@ -125,7 +134,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
 				minPauseBetweenCheckpoints,
 				maxConcurrentCheckpoints,
 				checkpointRetentionPolicy,
-				isExactlyOnce);
+				isExactlyOnce,
+				isPreferCheckpointForRecovery);
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 32b32cf..aac54b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -80,7 +80,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 			new FailingCompletedCheckpointStore(),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			false);
 
 		coord.triggerCheckpoint(triggerTimestamp, false);
 
@@ -142,7 +143,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 		}
 
 		@Override
-		public CompletedCheckpoint getLatestCheckpoint() throws Exception {
+		public CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) throws Exception {
 			throw new UnsupportedOperationException("Not implemented.");
 		}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index f644c01..949947a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -206,7 +206,7 @@ public class CheckpointCoordinatorMasterHooksTest {
 		assertEquals(0, cc.getNumberOfPendingCheckpoints());
 
 		assertEquals(1, cc.getNumberOfRetainedSuccessfulCheckpoints());
-		final CompletedCheckpoint chk = cc.getCheckpointStore().getLatestCheckpoint();
+		final CompletedCheckpoint chk = cc.getCheckpointStore().getLatestCheckpoint(false);
 
 		final Collection<MasterState> masterStates = chk.getMasterHookStates();
 		assertEquals(2, masterStates.size());
@@ -435,7 +435,8 @@ public class CheckpointCoordinatorMasterHooksTest {
 				new StandaloneCompletedCheckpointStore(10),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				false);
 	}
 
 	private static <T> T mockGeneric(Class<?> clazz) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index e1f8f9c..5af7cf6f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -55,14 +55,19 @@ import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializableObject;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.mockito.hamcrest.MockitoHamcrest;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.mockito.verification.VerificationMode;
@@ -77,6 +82,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
@@ -143,7 +149,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCompletedCheckpointStore(1),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				false);
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -204,7 +211,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCompletedCheckpointStore(1),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				false);
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -256,7 +264,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCompletedCheckpointStore(1),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				false);
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -309,7 +318,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCompletedCheckpointStore(1),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				false);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -412,7 +422,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCompletedCheckpointStore(1),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				false);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -532,7 +543,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCompletedCheckpointStore(1),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				false);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -700,7 +712,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCompletedCheckpointStore(2),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				false);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -831,7 +844,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCompletedCheckpointStore(10),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				false);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -996,7 +1010,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCompletedCheckpointStore(2),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				false);
 
 			// trigger a checkpoint, partially acknowledged
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -1074,7 +1089,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCompletedCheckpointStore(2),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				false);
 
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1138,7 +1154,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCompletedCheckpointStore(1),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			false);
 
 		assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1271,7 +1288,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCompletedCheckpointStore(2),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				false);
 
 
 			coord.startCheckpointScheduler();
@@ -1361,7 +1379,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCompletedCheckpointStore(2),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				false);
 
 		try {
 			coord.startCheckpointScheduler();
@@ -1435,7 +1454,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCompletedCheckpointStore(1),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			false);
 
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
 		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1587,7 +1607,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCompletedCheckpointStore(10),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			false);
 
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -1681,7 +1702,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCompletedCheckpointStore(2),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				false);
 
 			coord.startCheckpointScheduler();
 
@@ -1755,7 +1777,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCompletedCheckpointStore(2),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				false);
 
 			coord.startCheckpointScheduler();
 
@@ -1832,7 +1855,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCompletedCheckpointStore(2),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				false);
 
 			coord.startCheckpointScheduler();
 
@@ -1885,7 +1909,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCompletedCheckpointStore(2),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			false);
 
 		List<CompletableFuture<CompletedCheckpoint>> savepointFutures = new ArrayList<>();
 
@@ -1939,7 +1964,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCompletedCheckpointStore(2),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			false);
 
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -2002,7 +2028,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			store,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			false);
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2117,7 +2144,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCompletedCheckpointStore(1),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			false);
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2197,6 +2225,149 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	}
 
 	@Test
+	public void testRestoreLatestCheckpointWhenPreferCheckpoint() throws Exception {
+		testRestoreLatestCheckpointIsPreferSavepoint(true);
+	}
+
+	@Test
+	public void testRestoreLatestCheckpointWhenPreferSavepoint() throws Exception {
+		testRestoreLatestCheckpointIsPreferSavepoint(false);
+	}
+
+	private void testRestoreLatestCheckpointIsPreferSavepoint(boolean isPreferCheckpoint) {
+		try {
+			final JobID jid = new JobID();
+			long timestamp = System.currentTimeMillis();
+			StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
+
+			final JobVertexID statefulId = new JobVertexID();
+			final JobVertexID statelessId = new JobVertexID();
+
+			Execution statefulExec1 = mockExecution();
+			Execution statelessExec1 = mockExecution();
+
+			ExecutionVertex stateful1 = mockExecutionVertex(statefulExec1, statefulId, 0, 1);
+			ExecutionVertex stateless1 = mockExecutionVertex(statelessExec1, statelessId, 0, 1);
+
+			ExecutionJobVertex stateful = mockExecutionJobVertex(statefulId,
+				new ExecutionVertex[] { stateful1 });
+			ExecutionJobVertex stateless = mockExecutionJobVertex(statelessId,
+				new ExecutionVertex[] { stateless1 });
+
+			Map<JobVertexID, ExecutionJobVertex> map = new HashMap<JobVertexID, ExecutionJobVertex>();
+			map.put(statefulId, stateful);
+			map.put(statelessId, stateless);
+
+			CompletedCheckpointStore store = new RecoverableCompletedCheckpointStore(2);
+
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+				jid,
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+				new ExecutionVertex[] { stateful1, stateless1 },
+				new ExecutionVertex[] { stateful1, stateless1 },
+				new ExecutionVertex[] { stateful1, stateless1 },
+				checkpointIDCounter,
+				store,
+				new MemoryStateBackend(),
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY,
+				isPreferCheckpoint);
+
+			//trigger a checkpoint and wait to become a completed checkpoint
+			assertTrue(coord.triggerCheckpoint(timestamp, false));
+
+			long checkpointId = checkpointIDCounter.getLast();
+
+			KeyGroupRange keyGroupRange = KeyGroupRange.of(0,0);
+			List<SerializableObject> testStates = Collections.singletonList(new SerializableObject());
+			KeyedStateHandle serializedKeyGroupStates = CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, testStates);
+
+			TaskStateSnapshot subtaskStatesForCheckpoint = new TaskStateSnapshot();
+
+			subtaskStatesForCheckpoint.putSubtaskStateByOperatorID(
+				OperatorID.fromJobVertexID(statefulId),
+				new OperatorSubtaskState(
+					StateObjectCollection.empty(),
+					StateObjectCollection.empty(),
+					StateObjectCollection.singleton(serializedKeyGroupStates),
+					StateObjectCollection.empty()));
+
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStatesForCheckpoint));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
+
+			CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0);
+			assertEquals(jid, success.getJobId());
+
+			// trigger a savepoint and wait it to be finished
+			String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+			timestamp = System.currentTimeMillis();
+			CompletableFuture<CompletedCheckpoint> savepointFuture = coord.triggerSavepoint(timestamp, savepointDir);
+
+
+			KeyGroupRange keyGroupRangeForSavepoint = KeyGroupRange.of(1, 1);
+			List<SerializableObject> testStatesForSavepoint = Collections.singletonList(new SerializableObject());
+			KeyedStateHandle serializedKeyGroupStatesForSavepoint = CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRangeForSavepoint, testStatesForSavepoint);
+
+			TaskStateSnapshot subtaskStatesForSavepoint = new TaskStateSnapshot();
+
+			subtaskStatesForSavepoint.putSubtaskStateByOperatorID(
+				OperatorID.fromJobVertexID(statefulId),
+				new OperatorSubtaskState(
+					StateObjectCollection.empty(),
+					StateObjectCollection.empty(),
+					StateObjectCollection.singleton(serializedKeyGroupStatesForSavepoint),
+					StateObjectCollection.empty()));
+
+			checkpointId = checkpointIDCounter.getLast();
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStatesForSavepoint));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
+
+			assertTrue(savepointFuture.isDone());
+
+			//restore and jump the latest savepoint
+			coord.restoreLatestCheckpointedState(map, true, false);
+
+			//compare and see if it used the checkpoint's subtaskStates
+			BaseMatcher<JobManagerTaskRestore> matcher = new BaseMatcher<JobManagerTaskRestore>() {
+				@Override
+				public boolean matches(Object o) {
+					if (o instanceof JobManagerTaskRestore) {
+						JobManagerTaskRestore taskRestore = (JobManagerTaskRestore) o;
+						if (isPreferCheckpoint) {
+							return Objects.equals(taskRestore.getTaskStateSnapshot(), subtaskStatesForCheckpoint);
+						} else {
+							return Objects.equals(taskRestore.getTaskStateSnapshot(), subtaskStatesForSavepoint);
+						}
+					}
+					return false;
+				}
+
+				@Override
+				public void describeTo(Description description) {
+					if (isPreferCheckpoint) {
+						description.appendValue(subtaskStatesForCheckpoint);
+					} else {
+						description.appendValue(subtaskStatesForSavepoint);
+					}
+				}
+			};
+
+			verify(statefulExec1, times(1)).setInitialState(MockitoHamcrest.argThat(matcher));
+			verify(statelessExec1, times(0)).setInitialState(Mockito.<JobManagerTaskRestore>any());
+
+			coord.shutdown(JobStatus.FINISHED);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
 	public void testStateRecoveryWhenTopologyChangeOut() throws Exception {
 		testStateRecoveryWithTopologyChange(0);
 	}
@@ -2264,7 +2435,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCompletedCheckpointStore(1),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			false);
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2532,7 +2704,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
 				new TestCompletedCheckpointStorageLocation());
 
-		when(standaloneCompletedCheckpointStore.getLatestCheckpoint()).thenReturn(completedCheckpoint);
+		when(standaloneCompletedCheckpointStore.getLatestCheckpoint(false)).thenReturn(completedCheckpoint);
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -2549,7 +2721,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			standaloneCompletedCheckpointStore,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			false);
 
 		coord.restoreLatestCheckpointedState(tasks, false, true);
 
@@ -2701,7 +2874,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCompletedCheckpointStore(1),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				false);
 
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -3136,7 +3310,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCompletedCheckpointStore(1),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			false);
 
 		// Periodic
 		try {
@@ -3357,7 +3532,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCompletedCheckpointStore(1),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			false);
 
 		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
 		coord.setCheckpointStatsTracker(tracker);
@@ -3396,7 +3572,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			store,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			false);
 
 		store.addCheckpoint(new CompletedCheckpoint(
 			new JobID(),
@@ -3463,7 +3640,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 					SharedStateRegistry instance = new SharedStateRegistry(deleteExecutor);
 					createdSharedStateRegistries.add(instance);
 					return instance;
-				});
+				},
+			false);
 
 		final int numCheckpoints = 3;
 
@@ -3654,4 +3832,36 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
 		}
 	}
+
+	private Execution mockExecution() {
+		Execution mock = mock(Execution.class);
+		when(mock.getAttemptId()).thenReturn(new ExecutionAttemptID());
+		when(mock.getState()).thenReturn(ExecutionState.RUNNING);
+		return mock;
+	}
+
+	private ExecutionVertex mockExecutionVertex(Execution execution, JobVertexID vertexId, int subtask, int parallelism) {
+		ExecutionVertex mock = mock(ExecutionVertex.class);
+		when(mock.getJobvertexId()).thenReturn(vertexId);
+		when(mock.getParallelSubtaskIndex()).thenReturn(subtask);
+		when(mock.getCurrentExecutionAttempt()).thenReturn(execution);
+		when(mock.getTotalNumberOfParallelSubtasks()).thenReturn(parallelism);
+		when(mock.getMaxParallelism()).thenReturn(parallelism);
+		return mock;
+	}
+
+	private ExecutionJobVertex mockExecutionJobVertex(JobVertexID id, ExecutionVertex[] vertices) {
+		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
+		when(vertex.getParallelism()).thenReturn(vertices.length);
+		when(vertex.getMaxParallelism()).thenReturn(vertices.length);
+		when(vertex.getJobVertexId()).thenReturn(id);
+		when(vertex.getTaskVertices()).thenReturn(vertices);
+		when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorID.fromJobVertexID(id)));
+		when(vertex.getUserDefinedOperatorIDs()).thenReturn(Collections.<OperatorID>singletonList(null));
+
+		for (ExecutionVertex v : vertices) {
+			when(v.getJobVertex()).thenReturn(vertex);
+		}
+		return vertex;
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index 780c0fe..bf438aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -90,7 +90,8 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
 					0L,
 					1,
 					CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-					true),
+					true,
+					false),
 				new SerializedValue<StateBackend>(new CustomStateBackend(outOfClassPath)),
 				serHooks);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 236717f..c441309 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -109,7 +109,8 @@ public class CheckpointStateRestoreTest {
 				new StandaloneCompletedCheckpointStore(1),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				false);
 
 			// create ourselves a checkpoint with state
 			final long timestamp = 34623786L;
@@ -187,7 +188,8 @@ public class CheckpointStateRestoreTest {
 				new StandaloneCompletedCheckpointStore(1),
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
-				SharedStateRegistry.DEFAULT_FACTORY);
+				SharedStateRegistry.DEFAULT_FACTORY,
+				false);
 
 			try {
 				coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false);
@@ -245,7 +247,8 @@ public class CheckpointStateRestoreTest {
 			new StandaloneCompletedCheckpointStore(1),
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
+			SharedStateRegistry.DEFAULT_FACTORY,
+			false);
 
 		// --- (2) Checkpoint misses state for a jobVertex (should work) ---
 		Map<OperatorID, OperatorState> checkpointTaskStates = new HashMap<>();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 82dcd02..fce0f3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -64,6 +64,7 @@ public class CheckpointStatsTrackerTest {
 				191929L,
 				123,
 				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+				false,
 				false
 			),
 			null);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index c4d8903..bb4350a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -79,11 +79,11 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		// Add and get latest
 		checkpoints.addCheckpoint(expected[0]);
 		assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints());
-		verifyCheckpoint(expected[0], checkpoints.getLatestCheckpoint());
+		verifyCheckpoint(expected[0], checkpoints.getLatestCheckpoint(false));
 
 		checkpoints.addCheckpoint(expected[1]);
 		assertEquals(2, checkpoints.getNumberOfRetainedCheckpoints());
-		verifyCheckpoint(expected[1], checkpoints.getLatestCheckpoint());
+		verifyCheckpoint(expected[1], checkpoints.getLatestCheckpoint(false));
 	}
 
 	/**
@@ -119,7 +119,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 	/**
 	 * Tests that
 	 * <ul>
-	 * <li>{@link CompletedCheckpointStore#getLatestCheckpoint()} returns <code>null</code>,</li>
+	 * <li>{@link CompletedCheckpointStore#getLatestCheckpoint(boolean)} returns <code>null</code>,</li>
 	 * <li>{@link CompletedCheckpointStore#getAllCheckpoints()} returns an empty list,</li>
 	 * <li>{@link CompletedCheckpointStore#getNumberOfRetainedCheckpoints()} returns 0.</li>
 	 * </ul>
@@ -128,7 +128,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 	public void testEmptyState() throws Exception {
 		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1);
 
-		assertNull(checkpoints.getLatestCheckpoint());
+		assertNull(checkpoints.getLatestCheckpoint(false));
 		assertEquals(0, checkpoints.getAllCheckpoints().size());
 		assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints());
 	}
@@ -179,7 +179,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		checkpoints.shutdown(JobStatus.FINISHED);
 
 		// Empty state
-		assertNull(checkpoints.getLatestCheckpoint());
+		assertNull(checkpoints.getLatestCheckpoint(false));
 		assertEquals(0, checkpoints.getAllCheckpoints().size());
 		assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints());
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index f9f50fe..bdb9975 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -166,7 +166,8 @@ public class ExecutionGraphCheckpointCoordinatorTest extends TestLogger {
 				counter,
 				store,
 				new MemoryStateBackend(),
-				CheckpointStatsTrackerTest.createTestTracker());
+				CheckpointStatsTrackerTest.createTestTracker(),
+				false);
 
 		JobVertex jobVertex = new JobVertex("MockVertex");
 		jobVertex.setInvokableClass(AbstractInvokable.class);
@@ -222,7 +223,7 @@ public class ExecutionGraphCheckpointCoordinatorTest extends TestLogger {
 		}
 
 		@Override
-		public CompletedCheckpoint getLatestCheckpoint() {
+		public CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) {
 			throw new UnsupportedOperationException("Not implemented.");
 		}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index c0c1877..af6d860 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -119,7 +119,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 
 		assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
 		assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
-		assertEquals(expected[2], checkpoints.getLatestCheckpoint());
+		assertEquals(expected[2], checkpoints.getLatestCheckpoint(false));
 
 		List<CompletedCheckpoint> expectedCheckpoints = new ArrayList<>(3);
 		expectedCheckpoints.add(expected[1]);
@@ -193,7 +193,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		sharedStateRegistry.close();
 		store.recover();
 
-		CompletedCheckpoint recovered = store.getLatestCheckpoint();
+		CompletedCheckpoint recovered = store.getLatestCheckpoint(false);
 		assertEquals(checkpoint, recovered);
 	}
 
@@ -220,7 +220,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		sharedStateRegistry.close();
 		checkpointStore.recover();
 
-		CompletedCheckpoint latestCheckpoint = checkpointStore.getLatestCheckpoint();
+		CompletedCheckpoint latestCheckpoint = checkpointStore.getLatestCheckpoint(false);
 
 		assertEquals(checkpoints.get(checkpoints.size() -1), latestCheckpoint);
 	}
@@ -251,7 +251,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		sharedStateRegistry = new SharedStateRegistry();
 		zkCheckpointStore2.recover();
 
-		CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint();
+		CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint(false);
 		assertTrue(recoveredCheckpoint instanceof TestCompletedCheckpoint);
 		TestCompletedCheckpoint recoveredTestCheckpoint = (TestCompletedCheckpoint) recoveredCheckpoint;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
index c8fcfb9..44437b0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
@@ -164,7 +164,7 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
 
 		zooKeeperCompletedCheckpointStore.recover();
 
-		CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
+		CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint(false);
 
 		// check that we return the latest retrievable checkpoint
 		// this should remove the latest checkpoint because it is broken
@@ -191,6 +191,130 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
 	}
 
 	/**
+	 * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper
+	 * and ignores those which cannot be retrieved via their state handles.
+	 *
+	 * <p>We have a timeout in case the ZooKeeper store get's into a deadlock/livelock situation.
+	 */
+	@Test(timeout = 50000)
+	public void testCheckpointRecoveryPreferCheckpoint() throws Exception {
+		final JobID jobID = new JobID();
+		final long checkpoint1Id = 1L;
+		final long checkpoint2Id = 2;
+		final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointsInZooKeeper = new ArrayList<>(4);
+
+		final Collection<Long> expectedCheckpointIds = new HashSet<>(2);
+		expectedCheckpointIds.add(1L);
+		expectedCheckpointIds.add(2L);
+
+		final RetrievableStateHandle<CompletedCheckpoint> failingRetrievableStateHandle = mock(RetrievableStateHandle.class);
+		when(failingRetrievableStateHandle.retrieveState()).thenThrow(new IOException("Test exception"));
+
+		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle1 = mock(RetrievableStateHandle.class);
+		when(retrievableStateHandle1.retrieveState()).then(
+			(invocation) -> new CompletedCheckpoint(
+				jobID,
+				checkpoint1Id,
+				1L,
+				1L,
+				new HashMap<>(),
+				null,
+				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+				new TestCompletedCheckpointStorageLocation()));
+
+		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle2 = mock(RetrievableStateHandle.class);
+		when(retrievableStateHandle2.retrieveState()).then(
+			(invocation -> new CompletedCheckpoint(
+				jobID,
+				checkpoint2Id,
+				2L,
+				2L,
+				new HashMap<>(),
+				null,
+				CheckpointProperties.forSavepoint(),
+				new TestCompletedCheckpointStorageLocation())));
+
+		checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1"));
+		checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1"));
+		checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2"));
+		checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2"));
+
+		final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
+		final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
+
+		ZooKeeperStateHandleStore<CompletedCheckpoint> zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock));
+		doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllAndLock();
+
+		final int numCheckpointsToRetain = 1;
+
+		// Mocking for the delete operation on the CuratorFramework client
+		// It assures that the callback is executed synchronously
+
+		final EnsurePath ensurePathMock = mock(EnsurePath.class);
+		final CuratorEvent curatorEventMock = mock(CuratorEvent.class);
+		when(curatorEventMock.getType()).thenReturn(CuratorEventType.DELETE);
+		when(curatorEventMock.getResultCode()).thenReturn(0);
+		when(client.newNamespaceAwareEnsurePath(anyString())).thenReturn(ensurePathMock);
+
+		when(
+			client
+				.delete()
+				.inBackground(any(BackgroundCallback.class), any(Executor.class))
+		).thenAnswer(new Answer<ErrorListenerPathable<Void>>() {
+			@Override
+			public ErrorListenerPathable<Void> answer(InvocationOnMock invocation) throws Throwable {
+				final BackgroundCallback callback = (BackgroundCallback) invocation.getArguments()[0];
+
+				ErrorListenerPathable<Void> result = mock(ErrorListenerPathable.class);
+
+				when(result.forPath(anyString())).thenAnswer(new Answer<Void>() {
+					@Override
+					public Void answer(InvocationOnMock invocation) throws Throwable {
+
+						callback.processResult(client, curatorEventMock);
+
+						return null;
+					}
+				});
+
+				return result;
+			}
+		});
+
+		ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
+			numCheckpointsToRetain,
+			zooKeeperStateHandleStoreMock,
+			Executors.directExecutor());
+
+		zooKeeperCompletedCheckpointStore.recover();
+
+		CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint(true);
+
+		// check that we return the latest retrievable checkpoint
+		// this should remove the latest checkpoint because it is broken
+		assertEquals(checkpoint1Id, latestCompletedCheckpoint.getCheckpointID());
+
+		// this should remove the second broken checkpoint because we're iterating over all checkpoints
+		List<CompletedCheckpoint> completedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
+
+		Collection<Long> actualCheckpointIds = new HashSet<>(completedCheckpoints.size());
+
+		for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
+			actualCheckpointIds.add(completedCheckpoint.getCheckpointID());
+		}
+
+		assertEquals(expectedCheckpointIds, actualCheckpointIds);
+
+		// check that we did not discard any of the state handles
+		verify(retrievableStateHandle1, never()).discardState();
+		verify(retrievableStateHandle2, never()).discardState();
+
+		// Make sure that we also didn't discard any of the broken handles. Only when checkpoints
+		// are subsumed should they be discarded.
+		verify(failingRetrievableStateHandle, never()).discardState();
+	}
+
+	/**
 	 * Tests that the checkpoint does not exist in the store when we fail to add
 	 * it into the store (i.e., there exists an exception thrown by the method).
 	 */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 98a1e16..4098643 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -140,7 +140,8 @@ public class ArchivedExecutionGraphTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			new MemoryStateBackend(),
-			statsTracker);
+			statsTracker,
+			false);
 
 		runtimeGraph.setJsonPlan("{}");
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
index 2991573..2002b29 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
@@ -362,7 +362,8 @@ public class ConcurrentFailoverStrategyExecutionGraphTest extends TestLogger {
 			1L,
 			3,
 			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-			true);
+			true,
+			false);
 
 		final ExecutionGraph graph = createSampleGraph(
 			jid,
@@ -401,7 +402,8 @@ public class ConcurrentFailoverStrategyExecutionGraphTest extends TestLogger {
 				1,
 				allVertices,
 				checkpointCoordinatorConfiguration,
-				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup()));
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup()),
+			false);
 
 		final CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 09ef74b..df80d3e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -801,6 +801,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 					0,
 					1,
 					CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+					false,
 					false),
 				null));
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index 55890d7..f52eab3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -114,7 +114,7 @@ public class FailoverRegionTest extends TestLogger {
 
 		// verify checkpoint has been completed successfully.
 		assertEquals(1, eg.getCheckpointCoordinator().getCheckpointStore().getNumberOfRetainedCheckpoints());
-		assertEquals(checkpointId, eg.getCheckpointCoordinator().getCheckpointStore().getLatestCheckpoint().getCheckpointID());
+		assertEquals(checkpointId, eg.getCheckpointCoordinator().getCheckpointStore().getLatestCheckpoint(false).getCheckpointID());
 
 		ev.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
 		assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev).getState());
@@ -535,7 +535,7 @@ public class FailoverRegionTest extends TestLogger {
 
 		// verify checkpoint has been restored successfully.
 		assertEquals(1, eg.getCheckpointCoordinator().getCheckpointStore().getNumberOfRetainedCheckpoints());
-		assertEquals(checkpointId, eg.getCheckpointCoordinator().getCheckpointStore().getLatestCheckpoint().getCheckpointID());
+		assertEquals(checkpointId, eg.getCheckpointCoordinator().getCheckpointStore().getLatestCheckpoint(false).getCheckpointID());
 	}
 
 	private ExecutionGraph createSingleRegionExecutionGraph(RestartStrategy restartStrategy) throws Exception {
@@ -622,7 +622,8 @@ public class FailoverRegionTest extends TestLogger {
 					0,
 					jobVertices,
 					mock(CheckpointCoordinatorConfiguration.class),
-					new UnregisteredMetricsGroup()));
+					new UnregisteredMetricsGroup()),
+				false);
 	}
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index adb6f5e..d778cf0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -346,7 +346,8 @@ public class JobGraphTest extends TestLogger {
 			Long.MAX_VALUE,
 			Integer.MAX_VALUE,
 			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-			true);
+			true,
+			false);
 
 		return new JobCheckpointingSettings(
 			Collections.emptyList(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
index 51e7fec..4665a63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
@@ -49,6 +49,7 @@ public class JobCheckpointingSettingsTest {
 				112,
 				12,
 				CheckpointRetentionPolicy.RETAIN_ON_FAILURE,
+				false,
 				false),
 			new SerializedValue<>(new MemoryStateBackend()));
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index ff9d6ed..6712570 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -490,7 +490,7 @@ public class JobMasterTest extends TestLogger {
 
 		try {
 			// starting the JobMaster should have read the savepoint
-			final CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint();
+			final CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint(false);
 
 			assertThat(savepointCheckpoint, Matchers.notNullValue());
 
@@ -551,7 +551,7 @@ public class JobMasterTest extends TestLogger {
 
 		try {
 			// starting the JobMaster should have read the savepoint
-			final CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint();
+			final CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint(false);
 
 			assertThat(savepointCheckpoint, Matchers.notNullValue());
 
@@ -602,7 +602,7 @@ public class JobMasterTest extends TestLogger {
 
 		try {
 			// starting the JobMaster should have read the savepoint
-			final CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint();
+			final CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint(false);
 
 			assertThat(savepointCheckpoint, Matchers.notNullValue());
 
@@ -1837,7 +1837,8 @@ public class JobMasterTest extends TestLogger {
 			1000L,
 			1,
 			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-			true);
+			true,
+			false);
 		final JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(
 			Collections.emptyList(),
 			Collections.emptyList(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
index 037ecd1..a80b137 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
@@ -75,11 +75,6 @@ public class RecoverableCompletedCheckpointStore implements CompletedCheckpointS
 	}
 
 	@Override
-	public CompletedCheckpoint getLatestCheckpoint() throws Exception {
-		return checkpoints.isEmpty() ? null : checkpoints.getLast();
-	}
-
-	@Override
 	public void shutdown(JobStatus jobStatus) throws Exception {
 		if (jobStatus.isGloballyTerminalState()) {
 			checkpoints.clear();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 87c800d..8b9ad55 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -72,6 +72,9 @@ public class CheckpointConfig implements java.io.Serializable {
 	/** Determines if a tasks are failed or not if there is an error in their checkpointing. Default: true */
 	private boolean failOnCheckpointingErrors = true;
 
+	/** Determines if a job will fallback to checkpoint when there is a more recent savepoint. **/
+	private boolean preferCheckpointForRecovery = false;
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -286,6 +289,24 @@ public class CheckpointConfig implements java.io.Serializable {
 	}
 
 	/**
+	 * Returns whether a job recovery should fallback to checkpoint when there is a more recent savepoint.
+	 *
+	 * @return <code>true</code> if a job recovery should fallback to checkpoint.
+	 */
+	@PublicEvolving
+	public boolean isPreferCheckpointForRecovery() {
+		return preferCheckpointForRecovery;
+	}
+
+	/**
+	 * Sets whether a job recovery should fallback to checkpoint when there is a more recent savepoint.
+	 */
+	@PublicEvolving
+	public void setPreferCheckpointForRecovery(boolean preferCheckpointForRecovery) {
+		this.preferCheckpointForRecovery = preferCheckpointForRecovery;
+	}
+
+	/**
 	 * Returns the cleanup behaviour for externalized checkpoints.
 	 *
 	 * @return The cleanup behaviour for externalized checkpoints or
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 05d0626..fdb38f5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -700,7 +700,8 @@ public class StreamingJobGraphGenerator {
 				cfg.getMinPauseBetweenCheckpoints(),
 				cfg.getMaxConcurrentCheckpoints(),
 				retentionAfterTermination,
-				isExactlyOnce),
+				isExactlyOnce,
+				cfg.isPreferCheckpointForRecovery()),
 			serializedStateBackend,
 			serializedHooks);
 
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
index f3d904e..986e410 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
@@ -244,7 +244,8 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
 						10,
 						1,
 						CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-						true),
+						true,
+						false),
 				null));
 
 		clusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
index 75e9ed1..6635e31 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
@@ -106,7 +106,8 @@ public class JobMasterTriggerSavepointITCase extends AbstractTestBase {
 				10,
 				1,
 				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-				true),
+				true,
+				false),
 			null));
 
 		clusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());