You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/18 17:09:20 UTC

[03/17] flink git commit: [FLINK-5823] [checkpoints] State backends now also handle the checkpoint metadata

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java
new file mode 100644
index 0000000..5f4b954
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
+
+/**
+ * A checkpoint storage location for the {@link MemoryStateBackend} when it durably
+ * persists the metadata in a file system.
+ *
+ * <p>This class inherits its behavior for metadata from the {@link FsCheckpointStorageLocation},
+ * which makes checkpoint metadata cross compatible between the two classes and hence between
+ * the {@link org.apache.flink.runtime.state.filesystem.FsStateBackend FsStateBackend} and the
+ * {@link MemoryStateBackend}.
+ */
+public class PersistentMetadataCheckpointStorageLocation extends FsCheckpointStorageLocation {
+
+	/** The internal pointer for the {@link MemoryStateBackend}'s storage location (data inline with
+	 * state handles) that gets sent to the TaskManagers to describe this storage. */
+	static final String LOCATION_POINTER = "(embedded)";
+
+	/**
+	 * Creates a checkpoint storage persists metadata to a file system and stores state
+	 * in line in state handles with the metadata.
+	 *
+	 * @param fileSystem The file system to which the metadata will be written.
+	 * @param checkpointDir The directory where the checkpoint metadata will be written.
+	 */
+	public PersistentMetadataCheckpointStorageLocation(FileSystem fileSystem, Path checkpointDir) {
+		super(fileSystem, checkpointDir, checkpointDir, checkpointDir);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String getLocationAsPointer() {
+		return LOCATION_POINTER;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return getClass().getName() + " @ " + getCheckpointDirectory();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0f8033d..325e955 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -39,7 +39,6 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.blob.{BlobServer, BlobStore}
 import org.apache.flink.runtime.checkpoint._
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.client._
 import org.apache.flink.runtime.clusterframework.{BootstrapTools, FlinkResourceManager}
 import org.apache.flink.runtime.clusterframework.messages._
@@ -552,31 +551,31 @@ class JobManager(
 
     case CancelJobWithSavepoint(jobId, savepointDirectory) =>
       try {
-        val targetDirectory = if (savepointDirectory != null) {
-          savepointDirectory
-        } else {
-          defaultSavepointDir
-        }
-
-        if (targetDirectory == null) {
-          log.info(s"Trying to cancel job $jobId with savepoint, but no " +
-            "savepoint directory configured.")
+        log.info(s"Trying to cancel job $jobId with savepoint to $savepointDirectory")
 
-          sender ! decorateMessage(CancellationFailure(jobId, new IllegalStateException(
-            "No savepoint directory configured. You can either specify a directory " +
-              "while cancelling via -s :targetDirectory or configure a cluster-wide " +
-              "default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.")))
-        } else {
-          log.info(s"Trying to cancel job $jobId with savepoint to $targetDirectory")
+        currentJobs.get(jobId) match {
+          case Some((executionGraph, _)) =>
+            val coord = executionGraph.getCheckpointCoordinator
 
-          currentJobs.get(jobId) match {
-            case Some((executionGraph, _)) =>
+            if (coord == null) {
+              sender ! decorateMessage(CancellationFailure(jobId, new IllegalStateException(
+                s"Job $jobId is not a streaming job.")))
+            }
+            else if (savepointDirectory == null &&
+                  !coord.getCheckpointStorage.hasDefaultSavepointLocation) {
+              log.info(s"Trying to cancel job $jobId with savepoint, but no " +
+                "savepoint directory configured.")
+
+              sender ! decorateMessage(CancellationFailure(jobId, new IllegalStateException(
+                "No savepoint directory configured. You can either specify a directory " +
+                  "while cancelling via -s :targetDirectory or configure a cluster-wide " +
+                  "default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.")))
+            } else {
               // We don't want any checkpoint between the savepoint and cancellation
-              val coord = executionGraph.getCheckpointCoordinator
               coord.stopCheckpointScheduler()
 
               // Trigger the savepoint
-              val future = coord.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
+              val future = coord.triggerSavepoint(System.currentTimeMillis(), savepointDirectory)
 
               val senderRef = sender()
               future.handleAsync[Void](
@@ -608,15 +607,15 @@ class JobManager(
                   }
                 },
                 context.dispatcher)
+            }
 
-            case None =>
-              log.info(s"No job found with ID $jobId.")
-              sender ! decorateMessage(
-                CancellationFailure(
-                  jobId,
-                  new IllegalArgumentException(s"No job found with ID $jobId."))
-              )
-          }
+          case None =>
+            log.info(s"No job found with ID $jobId.")
+            sender ! decorateMessage(
+              CancellationFailure(
+                jobId,
+                new IllegalArgumentException(s"No job found with ID $jobId."))
+            )
         }
       } catch {
         case t: Throwable =>
@@ -746,25 +745,28 @@ class JobManager(
         case Some((graph, _)) =>
           val checkpointCoordinator = graph.getCheckpointCoordinator()
 
-          if (checkpointCoordinator != null) {
+          if (checkpointCoordinator == null) {
+            sender ! decorateMessage(TriggerSavepointFailure(jobId, new IllegalStateException(
+              s"Job $jobId is not a streaming job.")))
+          }
+          else if (savepointDirectory.isEmpty &&
+                !checkpointCoordinator.getCheckpointStorage.hasDefaultSavepointLocation) {
+            log.info(s"Trying to trigger a savepoint, but no savepoint directory configured.")
+
+            sender ! decorateMessage(TriggerSavepointFailure(jobId, new IllegalStateException(
+              "No savepoint directory configured. You can either specify a directory " +
+                "when triggering the savepoint via -s :targetDirectory or configure a " +
+                "cluster-/application-wide default via key '" +
+                CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.")))
+          } else {
             // Immutable copy for the future
             val senderRef = sender()
             try {
-              val targetDirectory : String = savepointDirectory.getOrElse(
-                flinkConfiguration.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY))
-
-              if (targetDirectory == null) {
-                throw new IllegalStateException("No savepoint directory configured. " +
-                  "You can either specify a directory when triggering this savepoint or " +
-                  "configure a cluster-wide default via key '" +
-                  CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.")
-              }
-
               // Do this async, because checkpoint coordinator operations can
               // contain blocking calls to the state backend or ZooKeeper.
               val savepointFuture = checkpointCoordinator.triggerSavepoint(
                 System.currentTimeMillis(),
-                targetDirectory)
+                savepointDirectory.orNull)
 
               savepointFuture.handleAsync[Void](
                 new BiFunction[CompletedCheckpoint, Throwable, Void] {
@@ -794,10 +796,6 @@ class JobManager(
                 senderRef ! TriggerSavepointFailure(jobId, new Exception(
                   "Failed to trigger savepoint", e))
             }
-          } else {
-            sender() ! TriggerSavepointFailure(jobId, new IllegalStateException(
-              "Checkpointing disabled. You can enable it via the execution environment of " +
-                "your job."))
           }
 
         case None =>
@@ -809,19 +807,13 @@ class JobManager(
       future {
         try {
           log.info(s"Disposing savepoint at '$savepointPath'.")
-          //TODO user code class loader ?
-          // (has not been used so far and new savepoints can simply be deleted by file)
-          val savepoint = SavepointStore.loadSavepoint(
-            savepointPath,
-            Thread.currentThread().getContextClassLoader)
-
-          log.debug(s"$savepoint")
 
-          // Dispose checkpoint state
-          savepoint.dispose()
+          // there is a corner case issue with Flink 1.1 savepoints, which may contain
+          // user-defined state handles. however, it should work for all the standard cases,
+          // where the mem/fs/rocks state backends were used
+          val classLoader = Thread.currentThread().getContextClassLoader
 
-          // Remove the header file
-          SavepointStore.removeSavepointFile(savepointPath)
+          Checkpoints.disposeSavepoint(savepointPath, flinkConfiguration, classLoader, log.logger)
 
           senderRef ! DisposeSavepointSuccess
         } catch {

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
deleted file mode 100644
index f55e0d3..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.state.SharedStateRegistry;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * CheckpointCoordinator tests for externalized checkpoints.
- *
- * <p>This is separate from {@link CheckpointCoordinatorTest}, because that
- * test is already huge and covers many different configurations.
- */
-public class CheckpointCoordinatorExternalizedCheckpointsTest {
-
-	@Rule
-	public final TemporaryFolder tmp = new TemporaryFolder();
-
-	/**
-	 * Triggers multiple externalized checkpoints and verifies that the metadata
-	 * files have been created.
-	 */
-	@Test
-	public void testTriggerAndConfirmSimpleExternalizedCheckpoint()
-		throws Exception {
-		final JobID jid = new JobID();
-
-		final ExternalizedCheckpointSettings externalizedCheckpointSettings =
-			ExternalizedCheckpointSettings.externalizeCheckpoints(false);
-
-		final File checkpointDir = tmp.newFolder();
-		final FsStateBackend stateBackend = new FsStateBackend(checkpointDir.toURI());
-
-		// create some mock Execution vertices that receive the checkpoint trigger messages
-		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
-		final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
-		ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID1);
-		ExecutionVertex vertex2 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID2);
-
-		Map<JobVertexID, ExecutionJobVertex> jobVertices = new HashMap<>();
-		jobVertices.put(vertex1.getJobvertexId(), vertex1.getJobVertex());
-		jobVertices.put(vertex2.getJobvertexId(), vertex2.getJobVertex());
-
-		// set up the coordinator and validate the initial state
-		CheckpointCoordinator coord = new CheckpointCoordinator(
-			jid,
-			600000,
-			600000,
-			0,
-			Integer.MAX_VALUE,
-			externalizedCheckpointSettings,
-			new ExecutionVertex[] { vertex1, vertex2 },
-			new ExecutionVertex[] { vertex1, vertex2 },
-			new ExecutionVertex[] { vertex1, vertex2 },
-			new StandaloneCheckpointIDCounter(),
-			new StandaloneCompletedCheckpointStore(1),
-			checkpointDir.getAbsolutePath(),
-			stateBackend,
-			Executors.directExecutor(),
-			SharedStateRegistry.DEFAULT_FACTORY);
-
-		assertEquals(0, coord.getNumberOfPendingCheckpoints());
-		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
-
-		// ---------------
-		// trigger checkpoint 1
-		// ---------------
-
-		{
-			final long timestamp1 = System.currentTimeMillis();
-
-			coord.triggerCheckpoint(timestamp1, false);
-
-			long checkpointId1 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId1));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId1));
-
-			CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint();
-
-			verifyExternalizedCheckpoint(latest, jid, checkpointId1, timestamp1);
-			verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
-		}
-
-		// ---------------
-		// trigger checkpoint 2
-		// ---------------
-
-		{
-			final long timestamp2 = System.currentTimeMillis() + 7;
-			coord.triggerCheckpoint(timestamp2, false);
-
-			long checkpointId2 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2));
-
-			CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint();
-			verifyExternalizedCheckpoint(latest, jid, checkpointId2, timestamp2);
-			verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
-		}
-
-		// ---------------
-		// trigger checkpoint 3
-		// ---------------
-
-		{
-			final long timestamp3 = System.currentTimeMillis() + 146;
-			coord.triggerCheckpoint(timestamp3, false);
-
-			long checkpointId3 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId3));
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId3));
-
-			CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint();
-			verifyExternalizedCheckpoint(latest, jid, checkpointId3, timestamp3);
-			verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
-		}
-
-		coord.shutdown(JobStatus.FINISHED);
-	}
-
-	/**
-	 * Verifies an externalized completed checkpoint instance.
-	 *
-	 * <p>The provided JobID, checkpoint ID, timestamp need to match. Also, the
-	 * external pointer and external metadata need to be notNull and exist (currently
-	 * assuming that they are file system based).
-	 *
-	 * @param checkpoint Completed checkpoint to check.
-	 * @param jid JobID of the job the checkpoint belongs to.
-	 * @param checkpointId Checkpoint ID of the checkpoint to check.
-	 * @param timestamp Timestamp of the checkpoint to check.
-	 */
-	private static void verifyExternalizedCheckpoint(CompletedCheckpoint checkpoint, JobID jid, long checkpointId, long timestamp) {
-		assertEquals(jid, checkpoint.getJobId());
-		assertEquals(checkpointId, checkpoint.getCheckpointID());
-		assertEquals(timestamp, checkpoint.getTimestamp());
-		assertNotNull(checkpoint.getExternalPointer());
-		assertNotNull(checkpoint.getExternalizedMetadata());
-		FileStateHandle fsHandle = (FileStateHandle) checkpoint.getExternalizedMetadata();
-		assertTrue(new File(fsHandle.getFilePath().getPath()).exists());
-	}
-
-	private static void verifyExternalizedCheckpointRestore(
-			CompletedCheckpoint checkpoint,
-			Map<JobVertexID, ExecutionJobVertex> jobVertices,
-			ExecutionVertex... vertices) throws IOException {
-
-		CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint(
-				checkpoint.getJobId(),
-				jobVertices,
-				checkpoint.getExternalPointer(),
-				Thread.currentThread().getContextClassLoader(),
-				false);
-
-		for (ExecutionVertex vertex : vertices) {
-			for (OperatorID operatorID : vertex.getJobVertex().getOperatorIDs()) {
-				assertEquals(checkpoint.getOperatorStates().get(operatorID), loaded.getOperatorStates().get(operatorID));
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index add7447..cbfe0ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
@@ -72,13 +71,12 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 			600000,
 			0,
 			Integer.MAX_VALUE,
-			ExternalizedCheckpointSettings.none(),
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 			new ExecutionVertex[]{vertex},
 			new ExecutionVertex[]{vertex},
 			new ExecutionVertex[]{vertex},
 			new StandaloneCheckpointIDCounter(),
 			new FailingCompletedCheckpointStore(),
-			null,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index e53bf3a..d583a0d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -26,12 +26,13 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.EmptyStreamStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
-
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+
 import org.junit.Test;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -229,9 +230,9 @@ public class CheckpointCoordinatorMasterHooksTest {
 				jid, checkpointId, 123L, 125L,
 				Collections.<OperatorID, OperatorState>emptyMap(),
 				masterHookStates,
-				CheckpointProperties.forStandardCheckpoint(),
-				null,
-				null);
+				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+				new EmptyStreamStateHandle(),
+				"pointer");
 
 		final ExecutionAttemptID execId = new ExecutionAttemptID();
 		final ExecutionVertex ackVertex = mockExecutionVertex(execId);
@@ -283,9 +284,9 @@ public class CheckpointCoordinatorMasterHooksTest {
 				jid, checkpointId, 123L, 125L,
 				Collections.<OperatorID, OperatorState>emptyMap(),
 				masterHookStates,
-				CheckpointProperties.forStandardCheckpoint(),
-				null,
-				null);
+				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+				new EmptyStreamStateHandle(),
+				"pointer");
 
 		final ExecutionAttemptID execId = new ExecutionAttemptID();
 		final ExecutionVertex ackVertex = mockExecutionVertex(execId);
@@ -396,13 +397,12 @@ public class CheckpointCoordinatorMasterHooksTest {
 				600000L,
 				0L,
 				1,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				new ExecutionVertex[0],
 				ackVertices,
 				new ExecutionVertex[0],
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(10),
-				null,
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 609e91c..9f9659a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -32,10 +32,10 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.EmptyStreamStateHandle;
 import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -135,13 +135,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				600000,
 				0,
 				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 				new ExecutionVertex[] { ackVertex1, ackVertex2 },
 				new ExecutionVertex[] {},
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
-				null,
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
@@ -197,13 +196,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				600000,
 				0,
 				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 				new ExecutionVertex[] { ackVertex1, ackVertex2 },
 				new ExecutionVertex[] {},
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
-				null,
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
@@ -250,13 +248,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				600000,
 				0,
 				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 				new ExecutionVertex[] { ackVertex1, ackVertex2 },
 				new ExecutionVertex[] {},
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
-				null,
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
@@ -304,13 +301,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				600000,
 				0,
 				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
-				null,
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
@@ -408,13 +404,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				600000,
 				0,
 				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
-				null,
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
@@ -529,13 +524,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				600000,
 				0,
 				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
-				null,
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
@@ -698,13 +692,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				600000,
 				0,
 				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 				new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
 				new ExecutionVertex[] { commitVertex },
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
-				null,
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
@@ -830,13 +823,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				600000,
 				0,
 				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 				new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
 				new ExecutionVertex[] { commitVertex },
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(10),
-				null,
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
@@ -996,13 +988,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				200,
 				0,
 				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				new ExecutionVertex[] { triggerVertex },
 				new ExecutionVertex[] { ackVertex1, ackVertex2 },
 				new ExecutionVertex[] { commitVertex },
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
-				null,
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
@@ -1075,13 +1066,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				200000,
 				0,
 				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				new ExecutionVertex[] { triggerVertex },
 				new ExecutionVertex[] { ackVertex1, ackVertex2 },
 				new ExecutionVertex[] { commitVertex },
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
-				null,
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
@@ -1140,13 +1130,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			20000L,
 			0L,
 			1,
-			ExternalizedCheckpointSettings.none(),
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 			new ExecutionVertex[] { triggerVertex },
 			new ExecutionVertex[] {triggerVertex, ackVertex1, ackVertex2},
 			new ExecutionVertex[0],
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
-			null,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
@@ -1274,13 +1263,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				200000,    // timeout is very long (200 s)
 				0,
 				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				new ExecutionVertex[] { triggerVertex },
 				new ExecutionVertex[] { ackVertex },
 				new ExecutionVertex[] { commitVertex },
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
-				null,
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
@@ -1365,13 +1353,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				200_000,     // timeout is very long (200 s)
 				delay,       // 50 ms delay between checkpoints
 				1,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				new ExecutionVertex[] { vertex },
 				new ExecutionVertex[] { vertex },
 				new ExecutionVertex[] { vertex },
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
-				"dummy-path",
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
@@ -1440,13 +1427,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			600000,
 			0,
 			Integer.MAX_VALUE,
-			ExternalizedCheckpointSettings.none(),
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 			new ExecutionVertex[] { vertex1, vertex2 },
 			new ExecutionVertex[] { vertex1, vertex2 },
 			new ExecutionVertex[] { vertex1, vertex2 },
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
-			null,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
@@ -1593,13 +1579,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			600000,
 			0,
 			Integer.MAX_VALUE,
-			ExternalizedCheckpointSettings.none(),
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 			new ExecutionVertex[] { vertex1, vertex2 },
 			new ExecutionVertex[] { vertex1, vertex2 },
 			new ExecutionVertex[] { vertex1, vertex2 },
 			counter,
 			new StandaloneCompletedCheckpointStore(10),
-			null,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
@@ -1695,13 +1680,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				200000,    // timeout is very long (200 s)
 				0L,        // no extra delay
 				maxConcurrentAttempts,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				new ExecutionVertex[] { triggerVertex },
 				new ExecutionVertex[] { ackVertex },
 				new ExecutionVertex[] { commitVertex },
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
-				null,
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
@@ -1770,13 +1754,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				200000,    // timeout is very long (200 s)
 				0L,        // no extra delay
 				maxConcurrentAttempts, // max two concurrent checkpoints
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				new ExecutionVertex[] { triggerVertex },
 				new ExecutionVertex[] { ackVertex },
 				new ExecutionVertex[] { commitVertex },
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
-				null,
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
@@ -1848,13 +1831,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				200000,    // timeout is very long (200 s)
 				0L,        // no extra delay
 				2, // max two concurrent checkpoints
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				new ExecutionVertex[] { triggerVertex },
 				new ExecutionVertex[] { ackVertex },
 				new ExecutionVertex[] { commitVertex },
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
-				null,
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
@@ -1902,13 +1884,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			200000,
 			0L,
 			1, // max one checkpoint at a time => should not affect savepoints
-			ExternalizedCheckpointSettings.none(),
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 			new ExecutionVertex[] { vertex1 },
 			new ExecutionVertex[] { vertex1 },
 			new ExecutionVertex[] { vertex1 },
 			checkpointIDCounter,
 			new StandaloneCompletedCheckpointStore(2),
-			null,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
@@ -1957,13 +1938,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			200000,
 			100000000L, // very long min delay => should not affect savepoints
 			1,
-			ExternalizedCheckpointSettings.none(),
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 			new ExecutionVertex[] { vertex1 },
 			new ExecutionVertex[] { vertex1 },
 			new ExecutionVertex[] { vertex1 },
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(2),
-			null,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
@@ -2021,13 +2001,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			600000,
 			0,
 			Integer.MAX_VALUE,
-			ExternalizedCheckpointSettings.none(),
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 			arrayExecutionVertices,
 			arrayExecutionVertices,
 			arrayExecutionVertices,
 			new StandaloneCheckpointIDCounter(),
 			store,
-			null,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
@@ -2137,13 +2116,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			600000,
 			0,
 			Integer.MAX_VALUE,
-			ExternalizedCheckpointSettings.none(),
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 			arrayExecutionVertices,
 			arrayExecutionVertices,
 			arrayExecutionVertices,
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
-			null,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
@@ -2285,13 +2263,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			600000,
 			0,
 			Integer.MAX_VALUE,
-			ExternalizedCheckpointSettings.none(),
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 			arrayExecutionVertices,
 			arrayExecutionVertices,
 			arrayExecutionVertices,
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
-			null,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
@@ -2436,7 +2413,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	 */
 	public void testStateRecoveryWithTopologyChange(int scaleType) throws Exception {
 
-		/**
+		/*
 		 * Old topology
 		 * CHAIN(op1 -> op2) * parallelism1 -> CHAIN(op3 -> op4) * parallelism2
 		 */
@@ -2510,7 +2487,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			}
 		}
 
-		/**
+		/*
 		 * New topology
 		 * CHAIN(op5 -> op1 -> op2) * newParallelism1 -> CHAIN(op3 -> op6) * newParallelism2
 		 */
@@ -2557,9 +2534,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				System.currentTimeMillis() + 3000,
 				operatorStates,
 				Collections.<MasterState>emptyList(),
-				CheckpointProperties.forStandardCheckpoint(),
-				null,
-				null);
+				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+				new EmptyStreamStateHandle(),
+				"some/mock/pointer");
 
 		when(standaloneCompletedCheckpointStore.getLatestCheckpoint()).thenReturn(completedCheckpoint);
 
@@ -2570,13 +2547,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			600000,
 			0,
 			Integer.MAX_VALUE,
-			ExternalizedCheckpointSettings.none(),
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 			newJobVertex1.getTaskVertices(),
 			newJobVertex1.getTaskVertices(),
 			newJobVertex1.getTaskVertices(),
 			new StandaloneCheckpointIDCounter(),
 			standaloneCompletedCheckpointStore,
-			null,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
@@ -2719,13 +2695,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				600000,
 				0,
 				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.externalizeCheckpoints(true),
+				CheckpointRetentionPolicy.RETAIN_ON_FAILURE,
 				new ExecutionVertex[] { vertex1 },
 				new ExecutionVertex[] { vertex1 },
 				new ExecutionVertex[] { vertex1 },
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
-				"fake-directory",
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
@@ -2734,7 +2709,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 			for (PendingCheckpoint checkpoint : coord.getPendingCheckpoints().values()) {
 				CheckpointProperties props = checkpoint.getProps();
-				CheckpointProperties expected = CheckpointProperties.forExternalizedCheckpoint(true);
+				CheckpointProperties expected = CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE);
 
 				assertEquals(expected, props);
 			}
@@ -3196,13 +3171,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			600000,
 			0,
 			Integer.MAX_VALUE,
-			ExternalizedCheckpointSettings.none(),
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 			new ExecutionVertex[] { vertex1 },
 			new ExecutionVertex[] { vertex1 },
 			new ExecutionVertex[] { vertex1 },
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
-			null,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
@@ -3210,7 +3184,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		// Periodic
 		CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(
 				System.currentTimeMillis(),
-				CheckpointProperties.forStandardCheckpoint(),
+				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
 				null,
 				true);
 
@@ -3220,7 +3194,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		// Not periodic
 		triggerResult = coord.triggerCheckpoint(
 				System.currentTimeMillis(),
-				CheckpointProperties.forStandardCheckpoint(),
+				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
 				null,
 				false);
 
@@ -3376,13 +3350,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			600000,
 			0,
 			Integer.MAX_VALUE,
-			ExternalizedCheckpointSettings.none(),
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 			new ExecutionVertex[]{vertex1},
 			new ExecutionVertex[]{vertex1},
 			new ExecutionVertex[]{vertex1},
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
-			null,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
@@ -3397,7 +3370,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		assertTrue(coord.triggerCheckpoint(timestamp, false));
 
 		verify(tracker, times(1))
-			.reportPendingCheckpoint(eq(1L), eq(timestamp), eq(CheckpointProperties.forStandardCheckpoint()));
+			.reportPendingCheckpoint(eq(1L), eq(timestamp), eq(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)));
 	}
 
 	/**
@@ -3416,13 +3389,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			600000,
 			0,
 			Integer.MAX_VALUE,
-			ExternalizedCheckpointSettings.none(),
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 			new ExecutionVertex[]{vertex1},
 			new ExecutionVertex[]{vertex1},
 			new ExecutionVertex[]{vertex1},
 			new StandaloneCheckpointIDCounter(),
 			store,
-			null,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
@@ -3434,9 +3406,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			0,
 			Collections.<OperatorID, OperatorState>emptyMap(),
 			Collections.<MasterState>emptyList(),
-			CheckpointProperties.forStandardCheckpoint(),
-			null,
-			null));
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+			new EmptyStreamStateHandle(),
+			"some/dummy/pointer"));
 
 		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
 		coord.setCheckpointStatsTracker(tracker);
@@ -3474,13 +3446,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			600000L,
 			0L,
 			Integer.MAX_VALUE,
-			ExternalizedCheckpointSettings.none(),
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 			new ExecutionVertex[]{vertex1},
 			new ExecutionVertex[]{vertex1},
 			new ExecutionVertex[]{vertex1},
 			checkpointIDCounter,
 			completedCheckpointStore,
-			null,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
@@ -3568,13 +3539,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			600000,
 			0,
 			Integer.MAX_VALUE,
-			ExternalizedCheckpointSettings.none(),
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 			arrayExecutionVertices,
 			arrayExecutionVertices,
 			arrayExecutionVertices,
 			new StandaloneCheckpointIDCounter(),
 			store,
-			null,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
 				deleteExecutor -> {

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointExternalResumeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointExternalResumeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointExternalResumeTest.java
new file mode 100644
index 0000000..86eec77
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointExternalResumeTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * CheckpointCoordinator tests for externalized checkpoints.
+ *
+ * <p>This is separate from {@link CheckpointCoordinatorTest}, because that
+ * test is already huge and covers many different configurations.
+ */
+public class CheckpointExternalResumeTest {
+
+	@Rule
+	public final TemporaryFolder tmp = new TemporaryFolder();
+
+	/**
+	 * Triggers multiple externalized checkpoints and verifies that the metadata
+	 * files have been created.
+	 */
+	@Test
+	public void testTriggerAndConfirmSimpleExternalizedCheckpoint() throws Exception {
+		final JobID jid = new JobID();
+
+		final File checkpointDir = tmp.newFolder();
+		final FsStateBackend stateBackend = new FsStateBackend(checkpointDir.toURI());
+
+		// create some mock Execution vertices that receive the checkpoint trigger messages
+		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+		final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+		ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID1);
+		ExecutionVertex vertex2 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID2);
+
+		Map<JobVertexID, ExecutionJobVertex> jobVertices = new HashMap<>();
+		jobVertices.put(vertex1.getJobvertexId(), vertex1.getJobVertex());
+		jobVertices.put(vertex2.getJobvertexId(), vertex2.getJobVertex());
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+			jid,
+			600000,
+			600000,
+			0,
+			Integer.MAX_VALUE,
+			CheckpointRetentionPolicy.RETAIN_ON_FAILURE,
+			new ExecutionVertex[] { vertex1, vertex2 },
+			new ExecutionVertex[] { vertex1, vertex2 },
+			new ExecutionVertex[] { vertex1, vertex2 },
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(1),
+			stateBackend,
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
+
+		assertEquals(0, coord.getNumberOfPendingCheckpoints());
+		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+		// ---------------
+		// trigger checkpoint 1
+		// ---------------
+
+		{
+			final long timestamp1 = System.currentTimeMillis();
+
+			coord.triggerCheckpoint(timestamp1, false);
+
+			long checkpointId1 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId1));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId1));
+
+			CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint();
+
+			verifyExternalizedCheckpoint(latest, jid, checkpointId1, timestamp1);
+			verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
+		}
+
+		// ---------------
+		// trigger checkpoint 2
+		// ---------------
+
+		{
+			final long timestamp2 = System.currentTimeMillis() + 7;
+			coord.triggerCheckpoint(timestamp2, false);
+
+			long checkpointId2 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2));
+
+			CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint();
+			verifyExternalizedCheckpoint(latest, jid, checkpointId2, timestamp2);
+			verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
+		}
+
+		// ---------------
+		// trigger checkpoint 3
+		// ---------------
+
+		{
+			final long timestamp3 = System.currentTimeMillis() + 146;
+			coord.triggerCheckpoint(timestamp3, false);
+
+			long checkpointId3 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId3));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId3));
+
+			CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint();
+			verifyExternalizedCheckpoint(latest, jid, checkpointId3, timestamp3);
+			verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
+		}
+
+		coord.shutdown(JobStatus.FINISHED);
+	}
+
+	/**
+	 * Verifies an externalized completed checkpoint instance.
+	 *
+	 * <p>The provided JobID, checkpoint ID, timestamp need to match. Also, the
+	 * external pointer and external metadata need to be notNull and exist (currently
+	 * assuming that they are file system based).
+	 *
+	 * @param checkpoint Completed checkpoint to check.
+	 * @param jid JobID of the job the checkpoint belongs to.
+	 * @param checkpointId Checkpoint ID of the checkpoint to check.
+	 * @param timestamp Timestamp of the checkpoint to check.
+	 */
+	private void verifyExternalizedCheckpoint(CompletedCheckpoint checkpoint, JobID jid, long checkpointId, long timestamp) {
+		assertEquals(jid, checkpoint.getJobId());
+		assertEquals(checkpointId, checkpoint.getCheckpointID());
+		assertEquals(timestamp, checkpoint.getTimestamp());
+		assertNotNull(checkpoint.getExternalPointer());
+		FileStateHandle fsHandle = (FileStateHandle) checkpoint.getMetadataHandle();
+		assertTrue(new File(fsHandle.getFilePath().getPath()).exists());
+	}
+
+	private void verifyExternalizedCheckpointRestore(
+			CompletedCheckpoint checkpoint,
+			Map<JobVertexID, ExecutionJobVertex> jobVertices,
+			ExecutionVertex... vertices) throws IOException {
+
+		String pointer = checkpoint.getExternalPointer();
+		StreamStateHandle metadataHandle = new FsStateBackend(tmp.getRoot().toURI()).resolveCheckpoint(pointer);
+
+		CompletedCheckpoint loaded = Checkpoints.loadAndValidateCheckpoint(
+				checkpoint.getJobId(),
+				jobVertices,
+				checkpoint.getExternalPointer(),
+				metadataHandle,
+				Thread.currentThread().getContextClassLoader(),
+				false);
+
+		for (ExecutionVertex vertex : vertices) {
+			for (OperatorID operatorID : vertex.getJobVertex().getOperatorIDs()) {
+				assertEquals(checkpoint.getOperatorStates().get(operatorID), loaded.getOperatorStates().get(operatorID));
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
new file mode 100644
index 0000000..ee60fc7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * A test that checks that checkpoint metadata loading works properly, including validation
+ * of resumed state and dropped state.
+ */
+public class CheckpointMetadataLoadingTest {
+
+	@Rule
+	public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
+	/**
+	 * Tests loading and validation of savepoints with correct setup,
+	 * parallelism mismatch, and a missing task.
+	 */
+	@Test
+	public void testLoadAndValidateSavepoint() throws Exception {
+		File tmp = tmpFolder.newFolder();
+
+		int parallelism = 128128;
+		long checkpointId = Integer.MAX_VALUE + 123123L;
+		JobVertexID jobVertexID = new JobVertexID();
+		OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
+
+		OperatorSubtaskState subtaskState = new OperatorSubtaskState(
+				new OperatorStateHandle(Collections.emptyMap(), new ByteStreamStateHandle("testHandler", new byte[0])),
+				null,
+				null,
+				null);
+
+		OperatorState state = new OperatorState(operatorID, parallelism, parallelism);
+		state.putState(0, subtaskState);
+
+		Map<OperatorID, OperatorState> taskStates = new HashMap<>();
+		taskStates.put(operatorID, state);
+
+		JobID jobId = new JobID();
+
+		// Store savepoint
+		final SavepointV2 savepoint = new SavepointV2(checkpointId, taskStates.values(), Collections.emptyList());
+		final StreamStateHandle serializedMetadata;
+
+		try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
+			Checkpoints.storeCheckpointMetadata(savepoint, os);
+			serializedMetadata = new ByteStreamStateHandle("checkpoint", os.toByteArray());
+		}
+
+		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
+		when(vertex.getParallelism()).thenReturn(parallelism);
+		when(vertex.getMaxParallelism()).thenReturn(parallelism);
+		when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(operatorID));
+
+		Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
+		tasks.put(jobVertexID, vertex);
+
+		ClassLoader ucl = Thread.currentThread().getContextClassLoader();
+
+		// 1) Load and validate: everything correct
+		CompletedCheckpoint loaded = Checkpoints.loadAndValidateCheckpoint(jobId, tasks, "fake/path", serializedMetadata, ucl, false);
+
+		assertEquals(jobId, loaded.getJobId());
+		assertEquals(checkpointId, loaded.getCheckpointID());
+
+		// 2) Load and validate: max parallelism mismatch
+		when(vertex.getMaxParallelism()).thenReturn(222);
+		when(vertex.isMaxParallelismConfigured()).thenReturn(true);
+
+		try {
+			Checkpoints.loadAndValidateCheckpoint(jobId, tasks, "fake/path", serializedMetadata, ucl, false);
+			fail("Did not throw expected Exception");
+		} catch (IllegalStateException expected) {
+			assertTrue(expected.getMessage().contains("Max parallelism mismatch"));
+		}
+
+		// 3) Load and validate: missing vertex
+		assertNotNull(tasks.remove(jobVertexID));
+
+		try {
+			Checkpoints.loadAndValidateCheckpoint(jobId, tasks, "fake/path", serializedMetadata, ucl, false);
+			fail("Did not throw expected Exception");
+		} catch (IllegalStateException expected) {
+			assertTrue(expected.getMessage().contains("allowNonRestoredState"));
+		}
+
+		// 4) Load and validate: ignore missing vertex
+		Checkpoints.loadAndValidateCheckpoint(jobId, tasks, "fake/path", serializedMetadata, ucl, true);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
index a0509c4..c17172b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
@@ -31,40 +31,22 @@ import static org.junit.Assert.assertTrue;
 public class CheckpointPropertiesTest {
 
 	/**
-	 * Tests the default checkpoint properties.
-	 */
-	@Test
-	public void testCheckpointProperties() {
-		CheckpointProperties props = CheckpointProperties.forStandardCheckpoint();
-
-		assertFalse(props.forceCheckpoint());
-		assertFalse(props.externalizeCheckpoint());
-		assertTrue(props.discardOnSubsumed());
-		assertTrue(props.discardOnJobFinished());
-		assertTrue(props.discardOnJobCancelled());
-		assertTrue(props.discardOnJobFailed());
-		assertTrue(props.discardOnJobSuspended());
-	}
-
-	/**
 	 * Tests the external checkpoints properties.
 	 */
 	@Test
-	public void testExternalizedCheckpointProperties() {
-		CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(true);
+	public void testCheckpointProperties() {
+		CheckpointProperties props = CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE);
 
 		assertFalse(props.forceCheckpoint());
-		assertTrue(props.externalizeCheckpoint());
 		assertTrue(props.discardOnSubsumed());
 		assertTrue(props.discardOnJobFinished());
 		assertTrue(props.discardOnJobCancelled());
 		assertFalse(props.discardOnJobFailed());
 		assertTrue(props.discardOnJobSuspended());
 
-		props = CheckpointProperties.forExternalizedCheckpoint(false);
+		props = CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION);
 
 		assertFalse(props.forceCheckpoint());
-		assertTrue(props.externalizeCheckpoint());
 		assertTrue(props.discardOnSubsumed());
 		assertTrue(props.discardOnJobFinished());
 		assertFalse(props.discardOnJobCancelled());
@@ -77,10 +59,9 @@ public class CheckpointPropertiesTest {
 	 */
 	@Test
 	public void testSavepointProperties() {
-		CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
+		CheckpointProperties props = CheckpointProperties.forSavepoint();
 
 		assertTrue(props.forceCheckpoint());
-		assertTrue(props.externalizeCheckpoint());
 		assertFalse(props.discardOnSubsumed());
 		assertFalse(props.discardOnJobFinished());
 		assertFalse(props.discardOnJobCancelled());
@@ -94,22 +75,17 @@ public class CheckpointPropertiesTest {
 	@Test
 	public void testIsSavepoint() throws Exception {
 		{
-			CheckpointProperties props = CheckpointProperties.forStandardCheckpoint();
-			assertFalse(props.isSavepoint());
-		}
-
-		{
-			CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(true);
+			CheckpointProperties props = CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE);
 			assertFalse(props.isSavepoint());
 		}
 
 		{
-			CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(false);
+			CheckpointProperties props = CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION);
 			assertFalse(props.isSavepoint());
 		}
 
 		{
-			CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
+			CheckpointProperties props = CheckpointProperties.forSavepoint();
 			assertTrue(props.isSavepoint());
 
 			CheckpointProperties deserializedCheckpointProperties =

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index e98efc2..b8a2a54 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import java.io.IOException;
-import javax.annotation.Nullable;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -34,21 +32,24 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+import java.io.IOException;
 import java.io.Serializable;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -83,7 +84,7 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
 					10000L,
 					0L,
 					1,
-					ExternalizedCheckpointSettings.none(),
+					CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 					true),
 				new SerializedValue<StateBackend>(new CustomStateBackend(outOfClassPath)),
 				serHooks);
@@ -142,6 +143,7 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
 		/**
 		 * Simulate a custom option that is not in the normal classpath.
 		 */
+		@SuppressWarnings("unused")
 		private Serializable customOption;
 
 		public CustomStateBackend(Serializable customOption) {
@@ -149,9 +151,19 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
 		}
 
 		@Override
+		public StreamStateHandle resolveCheckpoint(String pointer) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
+			return mock(CheckpointStorage.class);
+		}
+
+		@Override
 		public CheckpointStreamFactory createStreamFactory(
 			JobID jobId, String operatorIdentifier) throws IOException {
-			return null;
+			throw new UnsupportedOperationException();
 		}
 
 		@Override
@@ -159,7 +171,7 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
 			JobID jobId,
 			String operatorIdentifier,
 			@Nullable String targetLocation) throws IOException {
-			return null;
+			throw new UnsupportedOperationException();
 		}
 
 		@Override
@@ -171,13 +183,13 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
 			TaskKvStateRegistry kvStateRegistry) throws Exception {
-			return null;
+			throw new UnsupportedOperationException();
 		}
 
 		@Override
 		public OperatorStateBackend createOperatorStateBackend(
 			Environment env, String operatorIdentifier) throws Exception {
-			return null;
+			throw new UnsupportedOperationException();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index df2d37a..ae354bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -27,8 +27,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.EmptyStreamStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
@@ -101,13 +101,12 @@ public class CheckpointStateRestoreTest {
 				200000L,
 				0,
 				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 				new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 				new ExecutionVertex[0],
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
-				null,
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
@@ -179,13 +178,12 @@ public class CheckpointStateRestoreTest {
 				200000L,
 				0,
 				Integer.MAX_VALUE,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				new ExecutionVertex[] { mock(ExecutionVertex.class) },
 				new ExecutionVertex[] { mock(ExecutionVertex.class) },
 				new ExecutionVertex[0],
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
-				null,
 				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
@@ -207,7 +205,7 @@ public class CheckpointStateRestoreTest {
 	/**
 	 * Tests that the allow non restored state flag is correctly handled.
 	 *
-	 * The flag only applies for state that is part of the checkpoint.
+	 * <p>The flag only applies for state that is part of the checkpoint.
 	 */
 	@Test
 	public void testNonRestoredState() throws Exception {
@@ -238,13 +236,12 @@ public class CheckpointStateRestoreTest {
 			Integer.MAX_VALUE,
 			0,
 			Integer.MAX_VALUE,
-			ExternalizedCheckpointSettings.none(),
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 			new ExecutionVertex[] {},
 			new ExecutionVertex[] {},
 			new ExecutionVertex[] {},
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
-			null,
 			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
@@ -266,9 +263,9 @@ public class CheckpointStateRestoreTest {
 			2,
 			new HashMap<>(checkpointTaskStates),
 			Collections.<MasterState>emptyList(),
-			CheckpointProperties.forStandardCheckpoint(),
-			null,
-			null);
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+			new EmptyStreamStateHandle(),
+			"<pointer>");
 
 		coord.getCheckpointStore().addCheckpoint(checkpoint);
 
@@ -294,9 +291,9 @@ public class CheckpointStateRestoreTest {
 			3,
 			new HashMap<>(checkpointTaskStates),
 			Collections.<MasterState>emptyList(),
-			CheckpointProperties.forStandardCheckpoint(),
-			null,
-			null);
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+			new EmptyStreamStateHandle(),
+			"pointer");
 
 		coord.getCheckpointStore().addCheckpoint(checkpoint);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
index 95a31d4..a410d26 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
@@ -175,23 +175,23 @@ public class CheckpointStatsHistoryTest {
 		PendingCheckpointStats pending = mock(PendingCheckpointStats.class);
 		when(pending.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
 		when(pending.getCheckpointId()).thenReturn(checkpointId);
-		when(pending.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+		when(pending.getProperties()).thenReturn(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 		return pending;
 	}
 
 	private CompletedCheckpointStats createCompletedCheckpointStats(long checkpointId) {
 		CompletedCheckpointStats completed = mock(CompletedCheckpointStats.class);
 		when(completed.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
-		when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+		when(completed.getProperties()).thenReturn(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 		when(completed.getCheckpointId()).thenReturn(checkpointId);
-		when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+		when(completed.getProperties()).thenReturn(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 		return completed;
 	}
 
 	private FailedCheckpointStats createFailedCheckpointStats(long checkpointId) {
 		FailedCheckpointStats failed = mock(FailedCheckpointStats.class);
 		when(failed.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
-		when(failed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+		when(failed.getProperties()).thenReturn(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 		when(failed.getCheckpointId()).thenReturn(checkpointId);
 		return failed;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java
index 6500369..b4bb88a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java
@@ -45,7 +45,11 @@ public class CheckpointStatsSnapshotTest {
 		summary.updateSummary(createCompletedCheckpointsStats(2221, 3333, 9122));
 
 		CheckpointStatsHistory history = new CheckpointStatsHistory(1);
-		RestoredCheckpointStats restored = new RestoredCheckpointStats(1, CheckpointProperties.forStandardCheckpoint(), 99119, null);
+		RestoredCheckpointStats restored = new RestoredCheckpointStats(
+				1,
+				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+				99119,
+				null);
 
 		CheckpointStatsSnapshot snapshot = new CheckpointStatsSnapshot(
 			counts,

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 0d19cd5..82dcd02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -38,9 +38,9 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+
 import org.junit.Test;
 
 public class CheckpointStatsTrackerTest {
@@ -63,7 +63,7 @@ public class CheckpointStatsTrackerTest {
 				19191992L,
 				191929L,
 				123,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				false
 			),
 			null);
@@ -98,7 +98,7 @@ public class CheckpointStatsTrackerTest {
 		PendingCheckpointStats pending = tracker.reportPendingCheckpoint(
 			0,
 			1,
-			CheckpointProperties.forStandardCheckpoint());
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 
 		pending.reportSubtaskStats(jobVertex.getJobVertexId(), createSubtaskStats(0));
 		pending.reportSubtaskStats(jobVertex.getJobVertexId(), createSubtaskStats(1));
@@ -147,7 +147,7 @@ public class CheckpointStatsTrackerTest {
 		PendingCheckpointStats completed1 = tracker.reportPendingCheckpoint(
 			0,
 			1,
-			CheckpointProperties.forStandardCheckpoint());
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 
 		completed1.reportSubtaskStats(jobVertex.getJobVertexId(), createSubtaskStats(0));
 		completed1.reportSubtaskStats(jobVertex.getJobVertexId(), createSubtaskStats(1));
@@ -159,7 +159,7 @@ public class CheckpointStatsTrackerTest {
 		PendingCheckpointStats failed = tracker.reportPendingCheckpoint(
 			1,
 			1,
-			CheckpointProperties.forStandardCheckpoint());
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 
 		failed.reportFailedCheckpoint(12, null);
 
@@ -167,7 +167,7 @@ public class CheckpointStatsTrackerTest {
 		PendingCheckpointStats savepoint = tracker.reportPendingCheckpoint(
 			2,
 			1,
-			CheckpointProperties.forStandardSavepoint());
+			CheckpointProperties.forSavepoint());
 
 		savepoint.reportSubtaskStats(jobVertex.getJobVertexId(), createSubtaskStats(0));
 		savepoint.reportSubtaskStats(jobVertex.getJobVertexId(), createSubtaskStats(1));
@@ -179,9 +179,9 @@ public class CheckpointStatsTrackerTest {
 		PendingCheckpointStats inProgress = tracker.reportPendingCheckpoint(
 			3,
 			1,
-			CheckpointProperties.forStandardCheckpoint());
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 
-		RestoredCheckpointStats restored = new RestoredCheckpointStats(81, CheckpointProperties.forStandardCheckpoint(), 123, null);
+		RestoredCheckpointStats restored = new RestoredCheckpointStats(81, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), 123, null);
 		tracker.reportRestoredCheckpoint(restored);
 
 		CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
@@ -254,7 +254,7 @@ public class CheckpointStatsTrackerTest {
 		PendingCheckpointStats pending = tracker.reportPendingCheckpoint(
 			0,
 			1,
-			CheckpointProperties.forStandardCheckpoint());
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 
 		pending.reportSubtaskStats(jobVertex.getJobVertexId(), createSubtaskStats(0));
 
@@ -270,7 +270,7 @@ public class CheckpointStatsTrackerTest {
 		assertNotEquals(snapshot2, snapshot3);
 
 		// Restore operation => new snapshot
-		tracker.reportRestoredCheckpoint(new RestoredCheckpointStats(12, CheckpointProperties.forStandardCheckpoint(), 12, null));
+		tracker.reportRestoredCheckpoint(new RestoredCheckpointStats(12, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), 12, null));
 
 		CheckpointStatsSnapshot snapshot4 = tracker.createSnapshot();
 		assertNotEquals(snapshot3, snapshot4);
@@ -373,7 +373,7 @@ public class CheckpointStatsTrackerTest {
 		PendingCheckpointStats pending = stats.reportPendingCheckpoint(
 			0,
 			0,
-			CheckpointProperties.forStandardCheckpoint());
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 
 		// Check counts
 		assertEquals(Long.valueOf(1), numCheckpoints.getValue());
@@ -415,7 +415,7 @@ public class CheckpointStatsTrackerTest {
 		PendingCheckpointStats nextPending = stats.reportPendingCheckpoint(
 			1,
 			11,
-			CheckpointProperties.forStandardCheckpoint());
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 
 		long failureTimestamp = 1230123L;
 		nextPending.reportFailedCheckpoint(failureTimestamp, null);
@@ -430,7 +430,7 @@ public class CheckpointStatsTrackerTest {
 		long restoreTimestamp = 183419283L;
 		RestoredCheckpointStats restored = new RestoredCheckpointStats(
 			1,
-			CheckpointProperties.forStandardCheckpoint(),
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
 			restoreTimestamp,
 			null);
 		stats.reportRestoredCheckpoint(restored);
@@ -446,7 +446,7 @@ public class CheckpointStatsTrackerTest {
 		PendingCheckpointStats thirdPending = stats.reportPendingCheckpoint(
 			2,
 			5000,
-			CheckpointProperties.forStandardCheckpoint());
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 
 		thirdPending.reportSubtaskStats(jobVertex.getJobVertexId(), subtaskStats);
 		thirdPending.reportCompletedCheckpoint(null);

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java
index dd9c2c8..2910765 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+
 import org.junit.Test;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -93,7 +93,7 @@ public class CompletedCheckpointStatsSummaryTest {
 		return new CompletedCheckpointStats(
 			checkpointId,
 			triggerTimestamp,
-			CheckpointProperties.forStandardCheckpoint(),
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
 			1,
 			taskStats,
 			1,

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 320dc2d..d113aa1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.EmptyStreamStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
@@ -196,7 +197,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		SharedStateRegistry sharedStateRegistry) throws IOException {
 
 		int numberOfStates = 4;
-		CheckpointProperties props = CheckpointProperties.forStandardCheckpoint();
+		CheckpointProperties props = CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION);
 
 		OperatorID operatorID = new OperatorID();
 
@@ -256,7 +257,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			long timestamp,
 			Map<OperatorID, OperatorState> operatorGroupState,
 			CheckpointProperties props) {
-			super(jobId, checkpointId, timestamp, Long.MAX_VALUE, operatorGroupState, null, props, null, null);
+			super(jobId, checkpointId, timestamp, Long.MAX_VALUE, operatorGroupState, null, props,
+					new EmptyStreamStateHandle(), "<pointer");
 		}
 
 		@Override