You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/18 17:09:19 UTC

[02/17] flink git commit: [FLINK-5823] [checkpoints] State backends now also handle the checkpoint metadata

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 293675c..98b6647 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -25,11 +25,12 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.mockito.Mockito;
 
 import java.io.File;
 import java.util.Collections;
@@ -63,7 +64,7 @@ public class CompletedCheckpointTest {
 				new JobID(), 0, 0, 1,
 				taskStates,
 				Collections.<MasterState>emptyList(),
-				CheckpointProperties.forStandardCheckpoint(),
+				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
 				new FileStateHandle(new Path(file.toURI()), file.length()),
 				file.getAbsolutePath());
 
@@ -81,16 +82,18 @@ public class CompletedCheckpointTest {
 		Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
 		operatorStates.put(new OperatorID(), state);
 
+		StreamStateHandle metadataHandle = mock(StreamStateHandle.class);
+
 		boolean discardSubsumed = true;
-		CheckpointProperties props = new CheckpointProperties(false, false, false, discardSubsumed, true, true, true, true);
+		CheckpointProperties props = new CheckpointProperties(false, false, discardSubsumed, true, true, true, true);
 		
 		CompletedCheckpoint checkpoint = new CompletedCheckpoint(
 				new JobID(), 0, 0, 1,
 				operatorStates,
 				Collections.<MasterState>emptyList(),
 				props,
-				null,
-				null);
+				metadataHandle,
+				"some/mock/pointer");
 
 		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 		checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
@@ -100,6 +103,7 @@ public class CompletedCheckpointTest {
 		checkpoint.discardOnSubsume();
 
 		verify(state, times(1)).discardState();
+		verify(metadataHandle).discardState();
 	}
 
 	/**
@@ -107,49 +111,48 @@ public class CompletedCheckpointTest {
 	 */
 	@Test
 	public void testCleanUpOnShutdown() throws Exception {
-		File file = tmpFolder.newFile();
-		String externalPath = file.getAbsolutePath();
-
 		JobStatus[] terminalStates = new JobStatus[] {
 				JobStatus.FINISHED, JobStatus.CANCELED, JobStatus.FAILED, JobStatus.SUSPENDED
 		};
 
-		OperatorState state = mock(OperatorState.class);
-		Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
-		operatorStates.put(new OperatorID(), state);
-
 		for (JobStatus status : terminalStates) {
-			Mockito.reset(state);
+
+			OperatorState state = mock(OperatorState.class);
+			Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
+			operatorStates.put(new OperatorID(), state);
+
+			StreamStateHandle metadataHandle = mock(StreamStateHandle.class);
 
 			// Keep
-			CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false, false);
+			CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false);
 			CompletedCheckpoint checkpoint = new CompletedCheckpoint(
 					new JobID(), 0, 0, 1,
 					new HashMap<>(operatorStates),
 					Collections.<MasterState>emptyList(),
 					props,
-					new FileStateHandle(new Path(file.toURI()), file.length()),
-					externalPath);
+					metadataHandle,
+					"mock://some/pointer");
 
 			SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 			checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
 
 			checkpoint.discardOnShutdown(status);
 			verify(state, times(0)).discardState();
-			assertEquals(true, file.exists());
+			verify(metadataHandle, times(0)).discardState();
 
 			// Discard
-			props = new CheckpointProperties(false, false, false, true, true, true, true, true);
+			props = new CheckpointProperties(false, false, true, true, true, true, true);
 			checkpoint = new CompletedCheckpoint(
 					new JobID(), 0, 0, 1,
 					new HashMap<>(operatorStates),
 					Collections.<MasterState>emptyList(),
 					props,
-					null,
-					null);
+					metadataHandle,
+					"pointer");
 
 			checkpoint.discardOnShutdown(status);
 			verify(state, times(1)).discardState();
+			verify(metadataHandle, times(1)).discardState();
 		}
 	}
 
@@ -169,9 +172,9 @@ public class CompletedCheckpointTest {
 			1,
 			new HashMap<>(operatorStates),
 			Collections.<MasterState>emptyList(),
-			CheckpointProperties.forStandardCheckpoint(),
-			null,
-			null);
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+			mock(StreamStateHandle.class),
+			"pointer");
 
 		CompletedCheckpointStats.DiscardCallback callback = mock(CompletedCheckpointStats.DiscardCallback.class);
 		completed.setDiscardCallback(callback);
@@ -192,7 +195,7 @@ public class CompletedCheckpointTest {
 		CompletedCheckpointStats completed = new CompletedCheckpointStats(
 			123123123L,
 			10123L,
-			CheckpointProperties.forStandardCheckpoint(),
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
 			1337,
 			taskStats,
 			1337,

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 7d6c7b5..1d44444 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -29,15 +29,14 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-
 import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import scala.concurrent.Await;
@@ -80,7 +79,7 @@ public class CoordinatorShutdownTest extends TestLogger {
 						60000,
 						0L,
 						Integer.MAX_VALUE,
-						ExternalizedCheckpointSettings.none(),
+						CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 						true),
 					null));
 			
@@ -149,7 +148,7 @@ public class CoordinatorShutdownTest extends TestLogger {
 						60000,
 						0L,
 						Integer.MAX_VALUE,
-						ExternalizedCheckpointSettings.none(),
+						CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 						true),
 					null));
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 6a84a11..a53d6d2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -95,14 +94,13 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 				100,
 				100,
 				1,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				Collections.emptyList(),
 				Collections.emptyList(),
 				Collections.emptyList(),
 				Collections.emptyList(),
 				counter,
 				store,
-				null,
 				new MemoryStateBackend(),
 				CheckpointStatsTrackerTest.createTestTracker());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
index f1a56be..7668f62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+
 import org.junit.Test;
 
 import java.io.NotSerializableException;
@@ -47,7 +48,7 @@ public class FailedCheckpointStatsTest {
 		FailedCheckpointStats failed = new FailedCheckpointStats(
 			0,
 			triggerTimestamp,
-			CheckpointProperties.forStandardCheckpoint(),
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
 			1,
 			taskStats,
 			0,
@@ -73,7 +74,7 @@ public class FailedCheckpointStatsTest {
 		FailedCheckpointStats failed = new FailedCheckpointStats(
 			123123123L,
 			triggerTimestamp,
-			CheckpointProperties.forStandardCheckpoint(),
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
 			1337,
 			taskStats,
 			3,

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
index 6c5e8fd..73db317 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
@@ -42,7 +42,7 @@ public class PendingCheckpointStatsTest {
 	public void testReportSubtaskStats() throws Exception {
 		long checkpointId = Integer.MAX_VALUE + 1222L;
 		long triggerTimestamp = Integer.MAX_VALUE - 1239L;
-		CheckpointProperties props = CheckpointProperties.forStandardCheckpoint();
+		CheckpointProperties props = CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION);
 		TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3);
 		TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
 		int totalSubtaskCount = task1.getNumberOfSubtasks() + task2.getNumberOfSubtasks();
@@ -138,7 +138,7 @@ public class PendingCheckpointStatsTest {
 		PendingCheckpointStats pending = new PendingCheckpointStats(
 			0,
 			1,
-			CheckpointProperties.forStandardCheckpoint(),
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
 			task1.getNumberOfSubtasks() + task2.getNumberOfSubtasks(),
 			taskStats,
 			callback);
@@ -199,7 +199,7 @@ public class PendingCheckpointStatsTest {
 		PendingCheckpointStats pending = new PendingCheckpointStats(
 			0,
 			triggerTimestamp,
-			CheckpointProperties.forStandardCheckpoint(),
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
 			task1.getNumberOfSubtasks() + task2.getNumberOfSubtasks(),
 			taskStats,
 			callback);
@@ -251,7 +251,7 @@ public class PendingCheckpointStatsTest {
 		PendingCheckpointStats pending = new PendingCheckpointStats(
 			123123123L,
 			10123L,
-			CheckpointProperties.forStandardCheckpoint(),
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
 			1337,
 			taskStats,
 			mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index bf79457..7b6992b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -26,13 +28,15 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
+
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
 
-import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayDeque;
 import java.util.Collections;
@@ -43,7 +47,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -55,6 +58,9 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.when;
 
+/**
+ * Tests for the {@link PendingCheckpoint}.
+ */
 public class PendingCheckpointTest {
 
 	private static final Map<ExecutionAttemptID, ExecutionVertex> ACK_TASKS = new HashMap<>();
@@ -63,7 +69,7 @@ public class PendingCheckpointTest {
 	static {
 		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
 		when(jobVertex.getOperatorIDs()).thenReturn(Collections.singletonList(new OperatorID()));
-		
+
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 		when(vertex.getMaxParallelism()).thenReturn(128);
 		when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(1);
@@ -80,8 +86,8 @@ public class PendingCheckpointTest {
 	@Test
 	public void testCanBeSubsumed() throws Exception {
 		// Forced checkpoints cannot be subsumed
-		CheckpointProperties forced = new CheckpointProperties(true, true, false, false, false, false, false, false);
-		PendingCheckpoint pending = createPendingCheckpoint(forced, "ignored");
+		CheckpointProperties forced = new CheckpointProperties(true, true, false, false, false, false, false);
+		PendingCheckpoint pending = createPendingCheckpoint(forced);
 		assertFalse(pending.canBeSubsumed());
 
 		try {
@@ -92,48 +98,21 @@ public class PendingCheckpointTest {
 		}
 
 		// Non-forced checkpoints can be subsumed
-		CheckpointProperties subsumed = new CheckpointProperties(false, true, false, false, false, false, false, false);
-		pending = createPendingCheckpoint(subsumed, "ignored");
+		CheckpointProperties subsumed = new CheckpointProperties(false, true, false, false, false, false, false);
+		pending = createPendingCheckpoint(subsumed);
 		assertTrue(pending.canBeSubsumed());
 	}
 
 	/**
-	 * Tests that the persist checkpoint property is respected by the pending
-	 * checkpoint when finalizing.
-	 */
-	@Test
-	public void testPersistExternally() throws Exception {
-		File tmp = tmpFolder.newFolder();
-
-		// Persisted checkpoint
-		CheckpointProperties persisted = new CheckpointProperties(false, true, false, false, false, false, false, false);
-
-		PendingCheckpoint pending = createPendingCheckpoint(persisted, tmp.getAbsolutePath());
-		pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
-		assertEquals(0, tmp.listFiles().length);
-		pending.finalizeCheckpointExternalized();
-		assertEquals(1, tmp.listFiles().length);
-
-		// Ephemeral checkpoint
-		CheckpointProperties ephemeral = new CheckpointProperties(false, false, false, true, true, true, true, true);
-		pending = createPendingCheckpoint(ephemeral, null);
-		pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
-
-		assertEquals(1, tmp.listFiles().length);
-		pending.finalizeCheckpointNonExternalized();
-		assertEquals(1, tmp.listFiles().length);
-	}
-
-	/**
 	 * Tests that the completion future is succeeded on finalize and failed on
 	 * abort and failures during finalize.
 	 */
 	@Test
 	public void testCompletionFuture() throws Exception {
-		CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false, false);
+		CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false);
 
 		// Abort declined
-		PendingCheckpoint pending = createPendingCheckpoint(props, "ignored");
+		PendingCheckpoint pending = createPendingCheckpoint(props);
 		CompletableFuture<CompletedCheckpoint> future = pending.getCompletionFuture();
 
 		assertFalse(future.isDone());
@@ -141,7 +120,7 @@ public class PendingCheckpointTest {
 		assertTrue(future.isDone());
 
 		// Abort expired
-		pending = createPendingCheckpoint(props, "ignored");
+		pending = createPendingCheckpoint(props);
 		future = pending.getCompletionFuture();
 
 		assertFalse(future.isDone());
@@ -149,7 +128,7 @@ public class PendingCheckpointTest {
 		assertTrue(future.isDone());
 
 		// Abort subsumed
-		pending = createPendingCheckpoint(props, "ignored");
+		pending = createPendingCheckpoint(props);
 		future = pending.getCompletionFuture();
 
 		assertFalse(future.isDone());
@@ -157,29 +136,22 @@ public class PendingCheckpointTest {
 		assertTrue(future.isDone());
 
 		// Finalize (all ACK'd)
-		String target = tmpFolder.newFolder().getAbsolutePath();
-		pending = createPendingCheckpoint(props, target);
+		pending = createPendingCheckpoint(props);
 		future = pending.getCompletionFuture();
 
 		assertFalse(future.isDone());
 		pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
 		assertTrue(pending.isFullyAcknowledged());
-		pending.finalizeCheckpointExternalized();
+		pending.finalizeCheckpoint();
 		assertTrue(future.isDone());
 
 		// Finalize (missing ACKs)
-		pending = createPendingCheckpoint(props, "ignored");
+		pending = createPendingCheckpoint(props);
 		future = pending.getCompletionFuture();
 
 		assertFalse(future.isDone());
 		try {
-			pending.finalizeCheckpointNonExternalized();
-			fail("Did not throw expected Exception");
-		} catch (IllegalStateException ignored) {
-			// Expected
-		}
-		try {
-			pending.finalizeCheckpointExternalized();
+			pending.finalizeCheckpoint();
 			fail("Did not throw expected Exception");
 		} catch (IllegalStateException ignored) {
 			// Expected
@@ -192,16 +164,14 @@ public class PendingCheckpointTest {
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testAbortDiscardsState() throws Exception {
-		CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false, false);
+		CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false);
 		QueueExecutor executor = new QueueExecutor();
 
 		OperatorState state = mock(OperatorState.class);
 		doNothing().when(state).registerSharedStates(any(SharedStateRegistry.class));
 
-		String targetDir = tmpFolder.newFolder().getAbsolutePath();
-
 		// Abort declined
-		PendingCheckpoint pending = createPendingCheckpoint(props, targetDir, executor);
+		PendingCheckpoint pending = createPendingCheckpoint(props, executor);
 		setTaskState(pending, state);
 
 		pending.abortDeclined();
@@ -212,7 +182,7 @@ public class PendingCheckpointTest {
 		// Abort error
 		Mockito.reset(state);
 
-		pending = createPendingCheckpoint(props, targetDir, executor);
+		pending = createPendingCheckpoint(props, executor);
 		setTaskState(pending, state);
 
 		pending.abortError(new Exception("Expected Test Exception"));
@@ -223,7 +193,7 @@ public class PendingCheckpointTest {
 		// Abort expired
 		Mockito.reset(state);
 
-		pending = createPendingCheckpoint(props, targetDir, executor);
+		pending = createPendingCheckpoint(props, executor);
 		setTaskState(pending, state);
 
 		pending.abortExpired();
@@ -234,7 +204,7 @@ public class PendingCheckpointTest {
 		// Abort subsumed
 		Mockito.reset(state);
 
-		pending = createPendingCheckpoint(props, targetDir, executor);
+		pending = createPendingCheckpoint(props, executor);
 		setTaskState(pending, state);
 
 		pending.abortSubsumed();
@@ -251,20 +221,22 @@ public class PendingCheckpointTest {
 		{
 			// Complete successfully
 			PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
-			PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
+			PendingCheckpoint pending = createPendingCheckpoint(
+					CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 			pending.setStatsCallback(callback);
 
 			pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
 			verify(callback, times(1)).reportSubtaskStats(any(JobVertexID.class), any(SubtaskStateStats.class));
 
-			pending.finalizeCheckpointNonExternalized();
+			pending.finalizeCheckpoint();
 			verify(callback, times(1)).reportCompletedCheckpoint(any(String.class));
 		}
 
 		{
 			// Fail subsumed
 			PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
-			PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
+			PendingCheckpoint pending = createPendingCheckpoint(
+					CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 			pending.setStatsCallback(callback);
 
 			pending.abortSubsumed();
@@ -274,7 +246,8 @@ public class PendingCheckpointTest {
 		{
 			// Fail subsumed
 			PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
-			PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
+			PendingCheckpoint pending = createPendingCheckpoint(
+					CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 			pending.setStatsCallback(callback);
 
 			pending.abortDeclined();
@@ -284,7 +257,8 @@ public class PendingCheckpointTest {
 		{
 			// Fail subsumed
 			PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
-			PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
+			PendingCheckpoint pending = createPendingCheckpoint(
+					CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 			pending.setStatsCallback(callback);
 
 			pending.abortError(new Exception("Expected test error"));
@@ -294,7 +268,8 @@ public class PendingCheckpointTest {
 		{
 			// Fail subsumed
 			PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
-			PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
+			PendingCheckpoint pending = createPendingCheckpoint(
+					CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 			pending.setStatsCallback(callback);
 
 			pending.abortExpired();
@@ -303,41 +278,43 @@ public class PendingCheckpointTest {
 	}
 
 	/**
-	 * FLINK-5985
-	 * <p>
-	 * Ensures that subtasks that acknowledge their state as 'null' are considered stateless. This means that they
+	 * FLINK-5985.
+	 *
+	 * <p>Ensures that subtasks that acknowledge their state as 'null' are considered stateless. This means that they
 	 * should not appear in the task states map of the checkpoint.
 	 */
 	@Test
 	public void testNullSubtaskStateLeadsToStatelessTask() throws Exception {
-		PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
+		PendingCheckpoint pending = createPendingCheckpoint(
+				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 		pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class));
 		Assert.assertTrue(pending.getOperatorStates().isEmpty());
 	}
 
 	/**
-	 * FLINK-5985
-	 * <p>
-	 * This tests checks the inverse of {@link #testNullSubtaskStateLeadsToStatelessTask()}. We want to test that
+	 * FLINK-5985.
+	 *
+	 * <p>This tests checks the inverse of {@link #testNullSubtaskStateLeadsToStatelessTask()}. We want to test that
 	 * for subtasks that acknowledge some state are given an entry in the task states of the checkpoint.
 	 */
 	@Test
 	public void testNonNullSubtaskStateLeadsToStatefulTask() throws Exception {
-		PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
+		PendingCheckpoint pending = createPendingCheckpoint(
+				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 		pending.acknowledgeTask(ATTEMPT_ID, mock(TaskStateSnapshot.class), mock(CheckpointMetrics.class));
 		Assert.assertFalse(pending.getOperatorStates().isEmpty());
 	}
 
 	@Test
-	public void testSetCanceller() {
-		final CheckpointProperties props = new CheckpointProperties(false, false, false, true, true, true, true, true);
+	public void testSetCanceller() throws Exception {
+		final CheckpointProperties props = new CheckpointProperties(false, false, true, true, true, true, true);
 
-		PendingCheckpoint aborted = createPendingCheckpoint(props, null);
+		PendingCheckpoint aborted = createPendingCheckpoint(props);
 		aborted.abortDeclined();
 		assertTrue(aborted.isDiscarded());
 		assertFalse(aborted.setCancellerHandle(mock(ScheduledFuture.class)));
 
-		PendingCheckpoint pending = createPendingCheckpoint(props, null);
+		PendingCheckpoint pending = createPendingCheckpoint(props);
 		ScheduledFuture<?> canceller = mock(ScheduledFuture.class);
 
 		assertTrue(pending.setCancellerHandle(canceller));
@@ -347,23 +324,25 @@ public class PendingCheckpointTest {
 
 	// ------------------------------------------------------------------------
 
-	private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, String targetDirectory) {
-		return createPendingCheckpoint(props, targetDirectory, Executors.directExecutor());
+	private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props) throws IOException {
+		return createPendingCheckpoint(props, Executors.directExecutor());
 	}
 
-	private static PendingCheckpoint createPendingCheckpoint(
-			CheckpointProperties props,
-			String targetDirectory,
-			Executor executor) {
+	private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, Executor executor) throws IOException {
+
+		final Path checkpointDir = new Path(tmpFolder.newFolder().toURI());
+		final FsCheckpointStorageLocation location = new FsCheckpointStorageLocation(
+				LocalFileSystem.getSharedInstance(), checkpointDir, checkpointDir, checkpointDir);
+
+		final Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS);
 
-		Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS);
 		return new PendingCheckpoint(
 			new JobID(),
 			0,
 			1,
 			ackTasks,
 			props,
-			targetDirectory,
+			location,
 			executor);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
index d43283d..85b1516 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
@@ -31,7 +31,7 @@ public class RestoredCheckpointStatsTest {
 	public void testSimpleAccess() throws Exception {
 		long checkpointId = Integer.MAX_VALUE + 1L;
 		long triggerTimestamp = Integer.MAX_VALUE + 1L;
-		CheckpointProperties props = new CheckpointProperties(true, true, false, false, false, true, false, true);
+		CheckpointProperties props = new CheckpointProperties(true, true, false, false, true, false, true);
 		long restoreTimestamp = Integer.MAX_VALUE + 1L;
 		String externalPath = "external-path";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 7c19b19..a167130 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.state.EmptyStreamStateHandle;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
@@ -32,8 +33,10 @@ import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.framework.api.ErrorListenerPathable;
 import org.apache.curator.utils.EnsurePath;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
+
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -92,8 +95,9 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 			1L,
 			new HashMap<>(),
 			null,
-			CheckpointProperties.forStandardCheckpoint(),
-			null, null);
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+			new EmptyStreamStateHandle(),
+				"<pointer>");
 
 		final CompletedCheckpoint completedCheckpoint2 = new CompletedCheckpoint(
 			new JobID(),
@@ -102,8 +106,9 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 			2L,
 			new HashMap<>(),
 			null,
-			CheckpointProperties.forStandardCheckpoint(),
-			null, null);
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+			new EmptyStreamStateHandle(),
+			"<pointer");
 
 		final Collection<Long> expectedCheckpointIds = new HashSet<>(2);
 		expectedCheckpointIds.add(1L);

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
index 3498a41..ccc8afe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
@@ -21,9 +21,11 @@ package org.apache.flink.runtime.checkpoint.hooks;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import javax.annotation.Nullable;
+
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.concurrent.CompletableFuture;

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
deleted file mode 100644
index a461569..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class SavepointLoaderTest {
-
-	@Rule
-	public TemporaryFolder tmpFolder = new TemporaryFolder();
-
-	/**
-	 * Tests loading and validation of savepoints with correct setup,
-	 * parallelism mismatch, and a missing task.
-	 */
-	@Test
-	public void testLoadAndValidateSavepoint() throws Exception {
-		File tmp = tmpFolder.newFolder();
-
-		int parallelism = 128128;
-		long checkpointId = Integer.MAX_VALUE + 123123L;
-		JobVertexID jobVertexID = new JobVertexID();
-		OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
-
-		OperatorSubtaskState subtaskState = new OperatorSubtaskState(
-			new OperatorStateHandle(Collections.emptyMap(), new ByteStreamStateHandle("testHandler", new byte[0])),
-			null,
-			null,
-			null);
-
-		OperatorState state = new OperatorState(operatorID, parallelism, parallelism);
-		state.putState(0, subtaskState);
-
-		Map<OperatorID, OperatorState> taskStates = new HashMap<>();
-		taskStates.put(operatorID, state);
-
-		JobID jobId = new JobID();
-
-		// Store savepoint
-		SavepointV2 savepoint = new SavepointV2(checkpointId, taskStates.values(), Collections.<MasterState>emptyList());
-		String path = SavepointStore.storeSavepoint(tmp.getAbsolutePath(), savepoint);
-
-		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
-		when(vertex.getParallelism()).thenReturn(parallelism);
-		when(vertex.getMaxParallelism()).thenReturn(parallelism);
-		when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(operatorID));
-
-		Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
-		tasks.put(jobVertexID, vertex);
-
-		ClassLoader ucl = Thread.currentThread().getContextClassLoader();
-
-		// 1) Load and validate: everything correct
-		CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, false);
-
-		assertEquals(jobId, loaded.getJobId());
-		assertEquals(checkpointId, loaded.getCheckpointID());
-
-		// 2) Load and validate: max parallelism mismatch
-		when(vertex.getMaxParallelism()).thenReturn(222);
-		when(vertex.isMaxParallelismConfigured()).thenReturn(true);
-
-		try {
-			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, false);
-			fail("Did not throw expected Exception");
-		} catch (IllegalStateException expected) {
-			assertTrue(expected.getMessage().contains("Max parallelism mismatch"));
-		}
-
-		// 3) Load and validate: missing vertex
-		assertNotNull(tasks.remove(jobVertexID));
-
-		try {
-			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, false);
-			fail("Did not throw expected Exception");
-		} catch (IllegalStateException expected) {
-			assertTrue(expected.getMessage().contains("allowNonRestoredState"));
-		}
-
-		// 4) Load and validate: ignore missing vertex
-		SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, true);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
deleted file mode 100644
index 0444936..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import java.io.File;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.mockito.Matchers;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-
-public class SavepointStoreTest {
-
-	@Rule
-	public TemporaryFolder tmp = new TemporaryFolder();
-
-	/**
-	 * Tests a store-load-dispose sequence.
-	 */
-	@Test
-	public void testStoreLoadDispose() throws Exception {
-		String root = tmp.getRoot().getAbsolutePath();
-		File rootFile = new File(root);
-
-		File[] list = rootFile.listFiles();
-
-		assertNotNull(list);
-		assertEquals(0, list.length);
-
-		// Store
-		String savepointDirectory = SavepointStore.createSavepointDirectory(root, new JobID());
-		SavepointV2 stored = new SavepointV2(
-			1929292,
-			CheckpointTestUtils.createOperatorStates(4, 24),
-			Collections.<MasterState>emptyList());
-		String path = SavepointStore.storeSavepoint(savepointDirectory, stored);
-
-		list = rootFile.listFiles();
-		assertNotNull(list);
-		assertEquals(1, list.length);
-
-		// Load
-		Savepoint loaded = SavepointStore.loadSavepoint(path, Thread.currentThread().getContextClassLoader());
-
-		assertEquals(stored.getCheckpointId(), loaded.getCheckpointId());
-		assertEquals(stored.getOperatorStates(), loaded.getOperatorStates());
-		assertEquals(stored.getMasterStates(), loaded.getMasterStates());
-
-		loaded.dispose();
-
-		// Dispose
-		SavepointStore.deleteSavepointDirectory(path);
-
-		list = rootFile.listFiles();
-		assertNotNull(list);
-		assertEquals(0, list.length);
-	}
-
-	/**
-	 * Tests loading with unexpected magic number.
-	 */
-	@Test
-	public void testUnexpectedSavepoint() throws Exception {
-		// Random file
-		Path filePath = new Path(tmp.getRoot().getPath(), UUID.randomUUID().toString());
-		FSDataOutputStream fdos = FileSystem.get(filePath.toUri()).create(filePath, FileSystem.WriteMode.NO_OVERWRITE);
-		DataOutputStream dos = new DataOutputStream(fdos);
-		for (int i = 0; i < 10; i++) {
-			dos.writeLong(ThreadLocalRandom.current().nextLong());
-		}
-
-		try {
-			SavepointStore.loadSavepoint(filePath.toString(), Thread.currentThread().getContextClassLoader());
-			fail("Did not throw expected Exception");
-		} catch (RuntimeException e) {
-			assertTrue(e.getMessage().contains("Flink 1.0") && e.getMessage().contains("Unexpected magic number"));
-		}
-	}
-
-	/**
-	 * Tests addition of a new savepoint version.
-	 */
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testMultipleSavepointVersions() throws Exception {
-		Field field = SavepointSerializers.class.getDeclaredField("SERIALIZERS");
-		field.setAccessible(true);
-		Map<Integer, SavepointSerializer<?>> serializers = (Map<Integer, SavepointSerializer<?>>) field.get(null);
-
-		assertTrue(serializers.size() >= 1);
-
-		String root = tmp.getRoot().getAbsolutePath();
-		File rootFile = new File(root);
-
-		// New savepoint type for test
-		int version = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE); // make this a positive number
-		long checkpointId = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); // make this a positive number
-
-		// Add serializer
-		serializers.put(version, NewSavepointSerializer.INSTANCE);
-
-		String savepointDirectory1 = SavepointStore.createSavepointDirectory(root, new JobID());
-		TestSavepoint newSavepoint = new TestSavepoint(version, checkpointId);
-		String pathNewSavepoint = SavepointStore.storeSavepoint(savepointDirectory1, newSavepoint);
-
-		File[] list = rootFile.listFiles();
-
-		assertNotNull(list);
-		assertEquals(1, list.length);
-
-		// Savepoint v0
-		String savepointDirectory2 = SavepointStore.createSavepointDirectory(root, new JobID());
-		SavepointV2 savepoint = new SavepointV2(
-			checkpointId,
-			CheckpointTestUtils.createOperatorStates(4, 32),
-			Collections.<MasterState>emptyList());
-		String pathSavepoint = SavepointStore.storeSavepoint(savepointDirectory2, savepoint);
-
-		list = rootFile.listFiles();
-
-		assertNotNull(list);
-		assertEquals(2, list.length);
-
-		// Load
-		Savepoint loaded = SavepointStore.loadSavepoint(pathNewSavepoint, Thread.currentThread().getContextClassLoader());
-		assertEquals(newSavepoint, loaded);
-
-		loaded = SavepointStore.loadSavepoint(pathSavepoint, Thread.currentThread().getContextClassLoader());
-		assertEquals(savepoint.getCheckpointId(), loaded.getCheckpointId());
-		assertEquals(savepoint.getTaskStates(), loaded.getTaskStates());
-		assertEquals(savepoint.getMasterStates(), loaded.getMasterStates());
-	}
-
-	/**
-	 * Tests that an exception during store cleans up the created savepoint file.
-	 */
-	@Test
-	public void testCleanupOnStoreFailure() throws Exception {
-		Field field = SavepointSerializers.class.getDeclaredField("SERIALIZERS");
-		field.setAccessible(true);
-		Map<Integer, SavepointSerializer<?>> serializers = (Map<Integer, SavepointSerializer<?>>) field.get(null);
-
-		String target = tmp.getRoot().getAbsolutePath();
-
-		final int version = 123123;
-		SavepointSerializer<TestSavepoint> serializer = mock(SavepointSerializer.class);
-		doThrow(new RuntimeException("Test Exception")).when(serializer)
-				.serialize(Matchers.any(TestSavepoint.class), any(DataOutputStream.class));
-
-		serializers.put(version, serializer);
-
-		Savepoint savepoint = new TestSavepoint(version, 12123123);
-
-		assertEquals(0, tmp.getRoot().listFiles().length);
-
-		try {
-			SavepointStore.storeSavepoint(target, savepoint);
-		} catch (Throwable ignored) {
-		}
-
-		assertEquals("Savepoint file not cleaned up on failure", 0, tmp.getRoot().listFiles().length);
-	}
-
-	/**
-	 * Tests that multiple externalized checkpoints can be stored to the same
-	 * directory.
-	 */
-	@Test
-	public void testStoreExternalizedCheckpointsToSameDirectory() throws Exception {
-		String root = tmp.newFolder().getAbsolutePath();
-		FileSystem fs = FileSystem.get(new Path(root).toUri());
-
-		// Store
-		SavepointV2 savepoint = new SavepointV2(
-			1929292,
-			CheckpointTestUtils.createOperatorStates(4, 24),
-			Collections.<MasterState>emptyList());
-
-		FileStateHandle store1 = SavepointStore.storeExternalizedCheckpointToHandle(root, savepoint);
-		fs.exists(store1.getFilePath());
-		assertTrue(store1.getFilePath().getPath().contains(SavepointStore.EXTERNALIZED_CHECKPOINT_METADATA_FILE));
-
-		FileStateHandle store2 = SavepointStore.storeExternalizedCheckpointToHandle(root, savepoint);
-		fs.exists(store2.getFilePath());
-		assertTrue(store2.getFilePath().getPath().contains(SavepointStore.EXTERNALIZED_CHECKPOINT_METADATA_FILE));
-	}
-
-	private static class NewSavepointSerializer implements SavepointSerializer<TestSavepoint> {
-
-		private static final NewSavepointSerializer INSTANCE = new NewSavepointSerializer();
-
-		@Override
-		public void serialize(TestSavepoint savepoint, DataOutputStream dos) throws IOException {
-			dos.writeInt(savepoint.version);
-			dos.writeLong(savepoint.checkpointId);
-		}
-
-		@Override
-		public TestSavepoint deserialize(DataInputStream dis, ClassLoader userCL) throws IOException {
-			int version = dis.readInt();
-			long checkpointId = dis.readLong();
-			return new TestSavepoint(version, checkpointId);
-		}
-
-	}
-
-	private static class TestSavepoint implements Savepoint {
-
-		private final int version;
-		private final long checkpointId;
-
-		public TestSavepoint(int version, long checkpointId) {
-			this.version = version;
-			this.checkpointId = checkpointId;
-		}
-
-		@Override
-		public int getVersion() {
-			return version;
-		}
-
-		@Override
-		public long getCheckpointId() {
-			return checkpointId;
-		}
-
-		@Override
-		public Collection<TaskState> getTaskStates() {
-			return Collections.emptyList();
-		}
-
-		@Override
-		public Collection<MasterState> getMasterStates() {
-			return Collections.emptyList();
-		}
-
-		@Override
-		public Collection<OperatorState> getOperatorStates() {
-			return null;
-		}
-
-		@Override
-		public void dispose() {
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			}
-			if (o == null || getClass() != o.getClass()) {
-				return false;
-			}
-			TestSavepoint that = (TestSavepoint) o;
-			return version == that.version && checkpointId == that.checkpointId;
-
-		}
-
-		@Override
-		public int hashCode() {
-			int result = version;
-			result = 31 * result + (int) (checkpointId ^ (checkpointId >>> 32));
-			return result;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 3fe8613..ba93b7b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -15,20 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.ExecutionMode;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
@@ -42,7 +42,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
@@ -126,22 +125,16 @@ public class ArchivedExecutionGraphTest extends TestLogger {
 			100,
 			100,
 			1,
-			ExternalizedCheckpointSettings.none(),
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 			Collections.<ExecutionJobVertex>emptyList(),
 			Collections.<ExecutionJobVertex>emptyList(),
 			Collections.<ExecutionJobVertex>emptyList(),
 			Collections.<MasterTriggerRestoreHook<?>>emptyList(),
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
-			null,
 			new MemoryStateBackend(),
 			statsTracker);
 
-		Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>();
-		userAccumulators.put("userAcc", new LongCounter(64));
-
-		Execution executionWithAccumulators = runtimeGraph.getJobVertex(v1ID).getTaskVertices()[0].getCurrentExecutionAttempt();
-
 		runtimeGraph.setJsonPlan("{}");
 
 		runtimeGraph.getJobVertex(v2ID).getTaskVertices()[0].getCurrentExecutionAttempt().fail(new RuntimeException("This exception was thrown on purpose."));

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 12e9b5d..55166d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.PermanentBlobService;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -50,7 +51,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
@@ -690,7 +690,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 					10 * 60 * 1000,
 					0,
 					1,
-					ExternalizedCheckpointSettings.none(),
+					CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 					false),
 				null));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
index 4725296..a3a26f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
@@ -41,7 +42,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
@@ -77,7 +77,7 @@ import static org.mockito.Mockito.when;
 /**
  * These tests make sure that global failover (restart all) always takes precedence over
  * local recovery strategies.
- * 
+ *
  * <p>This test must be in the package it resides in, because it uses package-private methods
  * from the ExecutionGraph classes.
  */
@@ -312,7 +312,7 @@ public class IndividualRestartsConcurrencyTest extends TestLogger {
 			100000L,
 			1L,
 			3,
-			ExternalizedCheckpointSettings.none(),
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 			true);
 
 		final ExecutionGraph graph = createSampleGraph(
@@ -330,14 +330,13 @@ public class IndividualRestartsConcurrencyTest extends TestLogger {
 			checkpointCoordinatorConfiguration.getCheckpointTimeout(),
 			checkpointCoordinatorConfiguration.getMinPauseBetweenCheckpoints(),
 			checkpointCoordinatorConfiguration.getMaxConcurrentCheckpoints(),
-			checkpointCoordinatorConfiguration.getExternalizedCheckpointSettings(),
+			checkpointCoordinatorConfiguration.getCheckpointRetentionPolicy(),
 			allVertices,
 			allVertices,
 			allVertices,
 			Collections.emptyList(),
 			standaloneCheckpointIDCounter,
 			new StandaloneCompletedCheckpointStore(1),
-			"",
 			new MemoryStateBackend(),
 			new CheckpointStatsTracker(
 				1,

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
index 721b8f1..51e7fec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.util.SerializedValue;
@@ -47,7 +48,7 @@ public class JobCheckpointingSettingsTest {
 				1231,
 				112,
 				12,
-				ExternalizedCheckpointSettings.externalizeCheckpoints(true),
+				CheckpointRetentionPolicy.RETAIN_ON_FAILURE,
 				false),
 			new SerializedValue<>(new MemoryStateBackend()));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 79f7342..af9c8b4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
@@ -56,7 +57,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
@@ -252,7 +252,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 						10L * 60L * 1000L,
 						0L,
 						1,
-						ExternalizedCheckpointSettings.none(),
+						CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 						true),
 					null));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 452afd6..f8da2a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
 import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
@@ -61,7 +62,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
@@ -893,7 +893,7 @@ public class JobManagerTest extends TestLogger {
 					3600000,
 					0,
 					Integer.MAX_VALUE,
-					ExternalizedCheckpointSettings.none(),
+					CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 					true),
 					null);
 
@@ -921,7 +921,7 @@ public class JobManagerTest extends TestLogger {
 				if (cancelResp instanceof CancellationFailure) {
 					CancellationFailure failure = (CancellationFailure) cancelResp;
 					if (failure.cause().getMessage().contains(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING.message())) {
-						Thread.sleep(200); // wait and retry
+						Thread.sleep(10); // wait and retry
 					} else {
 						failure.cause().printStackTrace();
 						fail("Failed to cancel job: " + failure.cause().getMessage());
@@ -983,7 +983,7 @@ public class JobManagerTest extends TestLogger {
 			3600000,
 			0,
 			Integer.MAX_VALUE,
-			ExternalizedCheckpointSettings.none(),
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 			true);
 
 		JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings(
@@ -1105,7 +1105,7 @@ public class JobManagerTest extends TestLogger {
 					3600000,
 					0,
 					Integer.MAX_VALUE,
-					ExternalizedCheckpointSettings.none(),
+					CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 					true),
 				null);
 
@@ -1220,7 +1220,7 @@ public class JobManagerTest extends TestLogger {
 						360000,
 						0,
 						Integer.MAX_VALUE,
-						ExternalizedCheckpointSettings.none(),
+						CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 						true),
 					null);
 
@@ -1336,7 +1336,7 @@ public class JobManagerTest extends TestLogger {
 						360000,
 						0,
 						Integer.MAX_VALUE,
-						ExternalizedCheckpointSettings.none(),
+						CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 						true),
 					null);
 
@@ -1384,7 +1384,7 @@ public class JobManagerTest extends TestLogger {
 						360000,
 						0,
 						Integer.MAX_VALUE,
-						ExternalizedCheckpointSettings.none(),
+						CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 						true),
 					null);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 0ca83ae..508ccd0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
@@ -32,7 +33,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.NetUtils;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -235,7 +236,7 @@ public class JobSubmitTest {
 					5000,
 					0L,
 					10,
-					ExternalizedCheckpointSettings.none(),
+					CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 					true),
 				null));
 		return jg;

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
index 9e801a3..13a3194 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
@@ -21,11 +21,11 @@ package org.apache.flink.runtime.rest.handler.legacy;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.util.TestLogger;
 
@@ -97,7 +97,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 				timeout,
 				1L,
 				1,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				true));
 
 		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
@@ -131,7 +131,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 				timeout,
 				1L,
 				1,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				true));
 
 		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor, "the-default-directory");
@@ -184,7 +184,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 				1L,
 				1L,
 				1,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				true));
 
 		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
@@ -316,7 +316,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 				1L,
 				1L,
 				1,
-				ExternalizedCheckpointSettings.none(),
+				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 				true));
 
 		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
index c7560da..47ebb18 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
@@ -19,10 +19,10 @@
 package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
@@ -55,7 +55,7 @@ public class CheckpointConfigHandlerTest {
 		AccessExecutionGraph graph = graphAndSettings.graph;
 		when(graph.getJobID()).thenReturn(new JobID());
 		CheckpointCoordinatorConfiguration chkConfig = graphAndSettings.jobCheckpointingConfiguration;
-		ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings;
+		CheckpointRetentionPolicy retentionPolicy = graphAndSettings.retentionPolicy;
 
 		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph);
 		Assert.assertEquals(1, archives.size());
@@ -73,8 +73,8 @@ public class CheckpointConfigHandlerTest {
 
 		JsonNode externalizedNode = rootNode.get("externalization");
 		Assert.assertNotNull(externalizedNode);
-		Assert.assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean());
-		Assert.assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean());
+		Assert.assertEquals(retentionPolicy != CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, externalizedNode.get("enabled").asBoolean());
+		Assert.assertEquals(retentionPolicy != CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION, externalizedNode.get("delete_on_cancellation").asBoolean());
 
 	}
 
@@ -139,7 +139,7 @@ public class CheckpointConfigHandlerTest {
 		GraphAndSettings graphAndSettings = createGraphAndSettings(true, false);
 
 		AccessExecutionGraph graph = graphAndSettings.graph;
-		ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings;
+		CheckpointRetentionPolicy retentionPolicy = graphAndSettings.retentionPolicy;
 
 		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphCache.class), Executors.directExecutor());
 		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
@@ -147,8 +147,8 @@ public class CheckpointConfigHandlerTest {
 		ObjectMapper mapper = new ObjectMapper();
 		JsonNode externalizedNode = mapper.readTree(json).get("externalization");
 		assertNotNull(externalizedNode);
-		assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean());
-		assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean());
+		assertEquals(retentionPolicy != CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, externalizedNode.get("enabled").asBoolean());
+		assertEquals(retentionPolicy != CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION, externalizedNode.get("delete_on_cancellation").asBoolean());
 	}
 
 	private static GraphAndSettings createGraphAndSettings(boolean externalized, boolean exactlyOnce) {
@@ -156,36 +156,37 @@ public class CheckpointConfigHandlerTest {
 		long timeout = 996979L;
 		long minPause = 119191919L;
 		int maxConcurrent = 12929329;
-		ExternalizedCheckpointSettings externalizedSetting = externalized
-			? ExternalizedCheckpointSettings.externalizeCheckpoints(true)
-			: ExternalizedCheckpointSettings.none();
+
+		CheckpointRetentionPolicy retentionPolicy = externalized
+			? CheckpointRetentionPolicy.RETAIN_ON_FAILURE
+			: CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
 
 		CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
 			interval,
 			timeout,
 			minPause,
 			maxConcurrent,
-			externalizedSetting,
+			retentionPolicy,
 			exactlyOnce);
 
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
 		when(graph.getCheckpointCoordinatorConfiguration()).thenReturn(chkConfig);
 
-		return new GraphAndSettings(graph, chkConfig, externalizedSetting);
+		return new GraphAndSettings(graph, chkConfig, retentionPolicy);
 	}
 
 	private static class GraphAndSettings {
 		public final AccessExecutionGraph graph;
 		public final CheckpointCoordinatorConfiguration jobCheckpointingConfiguration;
-		public final ExternalizedCheckpointSettings externalizedSettings;
+		public final CheckpointRetentionPolicy retentionPolicy;
 
 		public GraphAndSettings(
 				AccessExecutionGraph graph,
 				CheckpointCoordinatorConfiguration jobCheckpointingConfiguration,
-				ExternalizedCheckpointSettings externalizedSettings) {
+				CheckpointRetentionPolicy retentionPolicy) {
 			this.graph = graph;
 			this.jobCheckpointingConfiguration = jobCheckpointingConfiguration;
-			this.externalizedSettings = externalizedSettings;
+			this.retentionPolicy = retentionPolicy;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
index 7cc8efb..ef65e43 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
@@ -167,7 +168,8 @@ public class CheckpointStatsDetailsHandlerTest {
 		PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class);
 		when(checkpoint.getCheckpointId()).thenReturn(1992139L);
 		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
-		when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+		when(checkpoint.getProperties()).thenReturn(
+				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 		when(checkpoint.getTriggerTimestamp()).thenReturn(1919191900L);
 		when(checkpoint.getLatestAckTimestamp()).thenReturn(1977791901L);
 		when(checkpoint.getStateSize()).thenReturn(111939272822L);
@@ -234,7 +236,7 @@ public class CheckpointStatsDetailsHandlerTest {
 		CompletedCheckpointStats checkpoint = mock(CompletedCheckpointStats.class);
 		when(checkpoint.getCheckpointId()).thenReturn(1818213L);
 		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
-		when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+		when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forSavepoint());
 		when(checkpoint.getTriggerTimestamp()).thenReturn(1818L);
 		when(checkpoint.getLatestAckTimestamp()).thenReturn(11029222L);
 		when(checkpoint.getStateSize()).thenReturn(925281L);
@@ -275,7 +277,7 @@ public class CheckpointStatsDetailsHandlerTest {
 		FailedCheckpointStats checkpoint = mock(FailedCheckpointStats.class);
 		when(checkpoint.getCheckpointId()).thenReturn(1818214L);
 		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
-		when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+		when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forSavepoint());
 		when(checkpoint.getTriggerTimestamp()).thenReturn(1818L);
 		when(checkpoint.getLatestAckTimestamp()).thenReturn(11029222L);
 		when(checkpoint.getStateSize()).thenReturn(925281L);

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java
index cc90d7f..1d5768b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -173,7 +174,7 @@ public class CheckpointStatsHandlerTest {
 		RestoredCheckpointStats latestRestored = mock(RestoredCheckpointStats.class);
 		when(latestRestored.getCheckpointId()).thenReturn(1199L);
 		when(latestRestored.getRestoreTimestamp()).thenReturn(434242L);
-		when(latestRestored.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+		when(latestRestored.getProperties()).thenReturn(CheckpointProperties.forSavepoint());
 		when(latestRestored.getExternalPath()).thenReturn("restored savepoint path");
 
 		// History
@@ -183,7 +184,8 @@ public class CheckpointStatsHandlerTest {
 		PendingCheckpointStats inProgress = mock(PendingCheckpointStats.class);
 		when(inProgress.getCheckpointId()).thenReturn(1992141L);
 		when(inProgress.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
-		when(inProgress.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+		when(inProgress.getProperties()).thenReturn(
+				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 		when(inProgress.getTriggerTimestamp()).thenReturn(1919191900L);
 		when(inProgress.getLatestAckTimestamp()).thenReturn(1977791901L);
 		when(inProgress.getStateSize()).thenReturn(111939272822L);
@@ -195,7 +197,7 @@ public class CheckpointStatsHandlerTest {
 		CompletedCheckpointStats completedSavepoint = mock(CompletedCheckpointStats.class);
 		when(completedSavepoint.getCheckpointId()).thenReturn(1322139L);
 		when(completedSavepoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
-		when(completedSavepoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+		when(completedSavepoint.getProperties()).thenReturn(CheckpointProperties.forSavepoint());
 		when(completedSavepoint.getTriggerTimestamp()).thenReturn(191900L);
 		when(completedSavepoint.getLatestAckTimestamp()).thenReturn(197791901L);
 		when(completedSavepoint.getStateSize()).thenReturn(1119822L);
@@ -209,7 +211,8 @@ public class CheckpointStatsHandlerTest {
 		FailedCheckpointStats failed = mock(FailedCheckpointStats.class);
 		when(failed.getCheckpointId()).thenReturn(110719L);
 		when(failed.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
-		when(failed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+		when(failed.getProperties()).thenReturn(
+				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 		when(failed.getTriggerTimestamp()).thenReturn(191900L);
 		when(failed.getLatestAckTimestamp()).thenReturn(197791901L);
 		when(failed.getStateSize()).thenReturn(1119822L);

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/test/java/org/apache/flink/runtime/state/EmptyStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/EmptyStreamStateHandle.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/EmptyStreamStateHandle.java
new file mode 100644
index 0000000..4b42959
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/EmptyStreamStateHandle.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import java.io.IOException;
+
+/**
+ * A simple dummy implementation of a stream state handle that can be passed in tests.
+ * The handle cannot open an input stream.
+ */
+public class EmptyStreamStateHandle implements StreamStateHandle {
+
+	private static final long serialVersionUID = 0L;
+
+	@Override
+	public FSDataInputStream openInputStream() throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void discardState() {}
+
+	@Override
+	public long getStateSize() {
+		return 0;
+	}
+}