You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/18 17:09:22 UTC

[05/17] flink git commit: [FLINK-5823] [checkpoints] State backends now also handle the checkpoint metadata

[FLINK-5823] [checkpoints] State backends now also handle the checkpoint metadata


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edc6f100
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edc6f100
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edc6f100

Branch: refs/heads/master
Commit: edc6f1000704a492629d7bdf8cbfa5ba5c45bb1f
Parents: d19525e
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 26 21:26:00 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 18 18:08:03 2018 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    |  26 +-
 .../jobmanager/JMXJobManagerMetricTest.java     |   7 +-
 .../checkpoint/CheckpointCoordinator.java       | 186 +++++------
 .../checkpoint/CheckpointProperties.java        | 105 ++----
 .../checkpoint/CheckpointRetentionPolicy.java   |  37 +++
 .../flink/runtime/checkpoint/Checkpoints.java   | 327 ++++++++++++++++++
 .../runtime/checkpoint/CompletedCheckpoint.java |  85 ++---
 .../runtime/checkpoint/PendingCheckpoint.java   | 149 +++------
 .../runtime/checkpoint/savepoint/Savepoint.java |   3 +-
 .../checkpoint/savepoint/SavepointLoader.java   | 159 ---------
 .../checkpoint/savepoint/SavepointStore.java    | 328 -------------------
 .../runtime/executiongraph/ExecutionGraph.java  |   8 +-
 .../executiongraph/ExecutionGraphBuilder.java   |  11 +-
 .../CheckpointCoordinatorConfiguration.java     |  27 +-
 .../tasks/ExternalizedCheckpointSettings.java   |  89 -----
 .../checkpoints/CheckpointConfigHandler.java    |   8 +-
 .../checkpoints/CheckpointConfigHandler.java    |   9 +-
 .../flink/runtime/state/CheckpointStorage.java  |  93 ++++++
 .../state/CheckpointStorageLocation.java        |  65 ++++
 .../runtime/state/CheckpointStreamFactory.java  |  27 ++
 .../flink/runtime/state/StateBackend.java       |  65 ++--
 .../filesystem/AbstractFileStateBackend.java    |  33 +-
 .../filesystem/AbstractFsCheckpointStorage.java | 256 +++++++++++++++
 .../filesystem/FixFileFsStateOutputStream.java  | 154 +++++++++
 .../state/filesystem/FsCheckpointStorage.java   |  85 +++++
 .../filesystem/FsCheckpointStorageLocation.java | 122 +++++++
 .../state/filesystem/FsStateBackend.java        |  11 +
 .../memory/MemoryBackendCheckpointStorage.java  | 135 ++++++++
 .../state/memory/MemoryStateBackend.java        |  24 +-
 ...istentMetadataCheckpointStorageLocation.java |  56 ++++
 ...istentMetadataCheckpointStorageLocation.java |  64 ++++
 .../flink/runtime/jobmanager/JobManager.scala   | 102 +++---
 ...tCoordinatorExternalizedCheckpointsTest.java | 206 ------------
 .../CheckpointCoordinatorFailureTest.java       |   4 +-
 .../CheckpointCoordinatorMasterHooksTest.java   |  20 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 116 +++----
 .../CheckpointExternalResumeTest.java           | 203 ++++++++++++
 .../CheckpointMetadataLoadingTest.java          | 132 ++++++++
 .../checkpoint/CheckpointPropertiesTest.java    |  38 +--
 .../CheckpointSettingsSerializableTest.java     |  28 +-
 .../checkpoint/CheckpointStateRestoreTest.java  |  25 +-
 .../checkpoint/CheckpointStatsHistoryTest.java  |   8 +-
 .../checkpoint/CheckpointStatsSnapshotTest.java |   6 +-
 .../checkpoint/CheckpointStatsTrackerTest.java  |  28 +-
 .../CompletedCheckpointStatsSummaryTest.java    |   4 +-
 .../CompletedCheckpointStoreTest.java           |   6 +-
 .../checkpoint/CompletedCheckpointTest.java     |  51 +--
 .../checkpoint/CoordinatorShutdownTest.java     |   7 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |   4 +-
 .../checkpoint/FailedCheckpointStatsTest.java   |   5 +-
 .../checkpoint/PendingCheckpointStatsTest.java  |   8 +-
 .../checkpoint/PendingCheckpointTest.java       | 143 ++++----
 .../checkpoint/RestoredCheckpointStatsTest.java |   2 +-
 .../ZooKeeperCompletedCheckpointStoreTest.java  |  13 +-
 .../checkpoint/hooks/MasterHooksTest.java       |   2 +
 .../savepoint/SavepointLoaderTest.java          | 124 -------
 .../savepoint/SavepointStoreTest.java           | 308 -----------------
 .../ArchivedExecutionGraphTest.java             |  13 +-
 .../ExecutionGraphDeploymentTest.java           |   4 +-
 .../IndividualRestartsConcurrencyTest.java      |   9 +-
 .../tasks/JobCheckpointingSettingsTest.java     |   3 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |   4 +-
 .../runtime/jobmanager/JobManagerTest.java      |  16 +-
 .../flink/runtime/jobmanager/JobSubmitTest.java |   5 +-
 ...obCancellationWithSavepointHandlersTest.java |  10 +-
 .../CheckpointConfigHandlerTest.java            |  31 +-
 .../CheckpointStatsDetailsHandlerTest.java      |   8 +-
 .../checkpoints/CheckpointStatsHandlerTest.java |  11 +-
 .../runtime/state/EmptyStreamStateHandle.java   |  45 +++
 .../AbstractFileStateBackendTest.java           | 309 +++++++++++++++++
 .../FixFileFsStateOutputStreamTest.java         | 285 ++++++++++++++++
 .../memory/MemoryCheckpointStorageTest.java     |  26 ++
 .../runtime/jobmanager/JobManagerITCase.scala   |  12 +-
 .../testingUtils/TestingJobManagerLike.scala    |  29 +-
 .../api/graph/StreamingJobGraphGenerator.java   |  12 +-
 .../tasks/StreamTaskTerminationTest.java        |  18 +-
 .../tasks/TaskCheckpointingBehaviourTest.java   |   2 +-
 .../test/checkpointing/SavepointITCase.java     |   9 -
 .../utils/SavepointMigrationTestBase.java       |   2 -
 .../streaming/runtime/StateBackendITCase.java   |  21 +-
 80 files changed, 3167 insertions(+), 2064 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 6bcd595..072f3a7 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -28,12 +28,14 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.util.AbstractID;
 
@@ -41,7 +43,6 @@ import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 import org.rocksdb.NativeLibraryLoader;
 import org.rocksdb.RocksDB;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -300,7 +301,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 
 	private void lazyInitializeForJob(
 			Environment env,
-			String operatorIdentifier) throws IOException {
+			@SuppressWarnings("unused") String operatorIdentifier) throws IOException {
 
 		if (isInitialized) {
 			return;
@@ -352,9 +353,22 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 		return initializedDbBasePaths[ni];
 	}
 
+	// ------------------------------------------------------------------------
+	//  Checkpoint initialization and persistent storage
+	// ------------------------------------------------------------------------
+
+	@Override
+	public StreamStateHandle resolveCheckpoint(String pointer) throws IOException {
+		return checkpointStreamBackend.resolveCheckpoint(pointer);
+	}
+
 	@Override
-	public CheckpointStreamFactory createStreamFactory(JobID jobId,
-			String operatorIdentifier) throws IOException {
+	public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
+		return checkpointStreamBackend.createCheckpointStorage(jobId);
+	}
+
+	@Override
+	public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
 		return checkpointStreamBackend.createStreamFactory(jobId, operatorIdentifier);
 	}
 
@@ -367,6 +381,10 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 		return checkpointStreamBackend.createSavepointStreamFactory(jobId, operatorIdentifier, targetLocation);
 	}
 
+	// ------------------------------------------------------------------------
+	//  State holding data structures
+	// ------------------------------------------------------------------------
+
 	@Override
 	public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 			Environment env,

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index c79e3d7..6770ec3 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -23,13 +23,13 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.jmx.JMXReporter;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
@@ -56,10 +56,9 @@ import static org.junit.Assert.assertEquals;
  * Tests to verify JMX reporter functionality on the JobManager.
  */
 public class JMXJobManagerMetricTest {
+
 	/**
 	 * Tests that metrics registered on the JobManager are actually accessible via JMX.
-	 *
-	 * @throws Exception
 	 */
 	@Test
 	public void testJobManagerJMXMetricAccess() throws Exception {
@@ -89,7 +88,7 @@ public class JMXJobManagerMetricTest {
 					500,
 					50,
 					5,
-					ExternalizedCheckpointSettings.none(),
+					CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 					true),
 				null));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 40fa2bd..95ca5d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -22,10 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -36,12 +33,14 @@ import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
@@ -119,12 +118,7 @@ public class CheckpointCoordinator {
 
 	/** The root checkpoint state backend, which is responsible for initializing the
 	 * checkpoint, storing the metadata, and cleaning up the checkpoint */
-	private final StateBackend checkpointStateBackend;
-
-	/** Default directory for persistent checkpoints; <code>null</code> if none configured.
-	 * THIS WILL BE REPLACED BY PROPER STATE-BACKEND METADATA WRITING */
-	@Nullable
-	private final String checkpointDirectory;
+	private final CheckpointStorage checkpointStorage;
 
 	/** A list of recent checkpoint IDs, to identify late messages (vs invalid ones) */
 	private final ArrayDeque<Long> recentPendingCheckpoints;
@@ -194,29 +188,23 @@ public class CheckpointCoordinator {
 			long checkpointTimeout,
 			long minPauseBetweenCheckpoints,
 			int maxConcurrentCheckpointAttempts,
-			ExternalizedCheckpointSettings externalizeSettings,
+			CheckpointRetentionPolicy retentionPolicy,
 			ExecutionVertex[] tasksToTrigger,
 			ExecutionVertex[] tasksToWaitFor,
 			ExecutionVertex[] tasksToCommitTo,
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore completedCheckpointStore,
-			@Nullable String checkpointDirectory,
 			StateBackend checkpointStateBackend,
 			Executor executor,
 			SharedStateRegistryFactory sharedStateRegistryFactory) {
 
 		// sanity checks
+		checkNotNull(checkpointStateBackend);
 		checkArgument(baseInterval > 0, "Checkpoint timeout must be larger than zero");
 		checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero");
 		checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0");
 		checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1");
 
-		if (externalizeSettings.externalizeCheckpoints() && checkpointDirectory == null) {
-			throw new IllegalStateException("CheckpointConfig says to persist periodic " +
-					"checkpoints, but no checkpoint directory has been configured. You can " +
-					"configure configure one via key '" + CheckpointingOptions.CHECKPOINTS_DIRECTORY.key() + "'.");
-		}
-
 		// max "in between duration" can be one year - this is to prevent numeric overflows
 		if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
 			minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000;
@@ -239,8 +227,6 @@ public class CheckpointCoordinator {
 		this.pendingCheckpoints = new LinkedHashMap<>();
 		this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
 		this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
-		this.checkpointStateBackend = checkNotNull(checkpointStateBackend);
-		this.checkpointDirectory = checkpointDirectory;
 		this.executor = checkNotNull(executor);
 		this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
 		this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
@@ -256,14 +242,11 @@ public class CheckpointCoordinator {
 		this.timer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
 		this.timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
 
-		if (externalizeSettings.externalizeCheckpoints()) {
-			LOG.info("Persisting periodic checkpoints externally at {}.", checkpointDirectory);
-			checkpointProperties = CheckpointProperties.forExternalizedCheckpoint(externalizeSettings.deleteOnCancellation());
-		} else {
-			checkpointProperties = CheckpointProperties.forStandardCheckpoint();
-		}
+		this.checkpointProperties = CheckpointProperties.forCheckpoint(retentionPolicy);
 
 		try {
+			this.checkpointStorage = checkpointStateBackend.createCheckpointStorage(job);
+
 			// Make sure the checkpoint ID enumerator is running. Possibly
 			// issues a blocking call to ZooKeeper.
 			checkpointIDCounter.start();
@@ -280,7 +263,7 @@ public class CheckpointCoordinator {
 	 * Adds the given master hook to the checkpoint coordinator. This method does nothing, if
 	 * the checkpoint coordinator already contained a hook with the same ID (as defined via
 	 * {@link MasterTriggerRestoreHook#getIdentifier()}).
-	 * 
+	 *
 	 * @param hook The hook to add.
 	 * @return True, if the hook was added, false if the checkpoint coordinator already
 	 *         contained a hook with the same ID.
@@ -366,52 +349,32 @@ public class CheckpointCoordinator {
 	 * Triggers a savepoint with the given savepoint directory as a target.
 	 *
 	 * @param timestamp The timestamp for the savepoint.
-	 * @param targetDirectory Target directory for the savepoint.
+	 * @param targetLocation Target location for the savepoint, optional. If null, the
+	 *                       state backend's configured default will be used.
 	 * @return A future to the completed checkpoint
 	 * @throws IllegalStateException If no savepoint directory has been
 	 *                               specified and no default savepoint directory has been
 	 *                               configured
 	 * @throws Exception             Failures during triggering are forwarded
 	 */
-	public CompletableFuture<CompletedCheckpoint> triggerSavepoint(long timestamp, String targetDirectory) throws Exception {
-		checkNotNull(targetDirectory, "Savepoint target directory");
-
-		CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
+	public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
+			long timestamp,
+			@Nullable String targetLocation) throws Exception {
 
-		// Create the unique savepoint directory
-		final String savepointDirectory = SavepointStore
-			.createSavepointDirectory(targetDirectory, job);
+		CheckpointProperties props = CheckpointProperties.forSavepoint();
 
 		CheckpointTriggerResult triggerResult = triggerCheckpoint(
 			timestamp,
 			props,
-			savepointDirectory,
+			targetLocation,
 			false);
 
-		CompletableFuture<CompletedCheckpoint> result;
-
 		if (triggerResult.isSuccess()) {
-			result = triggerResult.getPendingCheckpoint().getCompletionFuture();
+			return triggerResult.getPendingCheckpoint().getCompletionFuture();
 		} else {
 			Throwable cause = new Exception("Failed to trigger savepoint: " + triggerResult.getFailureReason().message());
 			return FutureUtils.completedExceptionally(cause);
 		}
-
-		// Make sure to remove the created base directory on Exceptions
-		result.whenCompleteAsync(
-			(CompletedCheckpoint checkpoint, Throwable throwable) -> {
-				if (throwable != null) {
-					try {
-						SavepointStore.deleteSavepointDirectory(savepointDirectory);
-					} catch (Throwable t) {
-						LOG.warn("Failed to delete savepoint directory " + savepointDirectory
-							+ " after failed savepoint.", t);
-					}
-				}
-			},
-			executor);
-
-		return result;
 	}
 
 	/**
@@ -425,7 +388,7 @@ public class CheckpointCoordinator {
 	 * @return <code>true</code> if triggering the checkpoint succeeded.
 	 */
 	public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
-		return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, isPeriodic).isSuccess();
+		return triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic).isSuccess();
 	}
 
 	/**
@@ -444,7 +407,7 @@ public class CheckpointCoordinator {
 
 			case CHECKPOINT:
 				CheckpointTriggerResult triggerResult =
-					triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, false);
+					triggerCheckpoint(timestamp, checkpointProperties, null, false);
 
 				if (triggerResult.isSuccess()) {
 					return triggerResult.getPendingCheckpoint().getCompletionFuture();
@@ -462,14 +425,9 @@ public class CheckpointCoordinator {
 	CheckpointTriggerResult triggerCheckpoint(
 			long timestamp,
 			CheckpointProperties props,
-			String targetDirectory,
+			@Nullable String externalSavepointLocation,
 			boolean isPeriodic) {
 
-		// Sanity check
-		if (props.externalizeCheckpoint() && targetDirectory == null) {
-			throw new IllegalStateException("No target directory specified to persist checkpoint to.");
-		}
-
 		// make some eager pre-checks
 		synchronized (lock) {
 			// abort if the coordinator has been shutdown in the meantime
@@ -557,11 +515,18 @@ public class CheckpointCoordinator {
 		// may issue blocking operations. Using a different lock than the coordinator-wide lock,
 		// we avoid blocking the processing of 'acknowledge/decline' messages during that time.
 		synchronized (triggerLock) {
+
+			final CheckpointStorageLocation checkpointStorageLocation;
 			final long checkpointID;
+
 			try {
 				// this must happen outside the coordinator-wide lock, because it communicates
 				// with external services (in HA mode) and may block for a while.
 				checkpointID = checkpointIdCounter.getAndIncrement();
+
+				checkpointStorageLocation = props.isSavepoint() ?
+						checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
+						checkpointStorage.initializeLocationForCheckpoint(checkpointID);
 			}
 			catch (Throwable t) {
 				int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
@@ -575,7 +540,7 @@ public class CheckpointCoordinator {
 				timestamp,
 				ackTasks,
 				props,
-				targetDirectory,
+				checkpointStorageLocation,
 				executor);
 
 			if (statsTracker != null) {
@@ -588,21 +553,18 @@ public class CheckpointCoordinator {
 			}
 
 			// schedule the timer that will clean up the expired checkpoints
-			final Runnable canceller = new Runnable() {
-				@Override
-				public void run() {
-					synchronized (lock) {
-						// only do the work if the checkpoint is not discarded anyways
-						// note that checkpoint completion discards the pending checkpoint object
-						if (!checkpoint.isDiscarded()) {
-							LOG.info("Checkpoint " + checkpointID + " expired before completing.");
+			final Runnable canceller = () -> {
+				synchronized (lock) {
+					// only do the work if the checkpoint is not discarded anyways
+					// note that checkpoint completion discards the pending checkpoint object
+					if (!checkpoint.isDiscarded()) {
+						LOG.info("Checkpoint " + checkpointID + " expired before completing.");
 
-							checkpoint.abortExpired();
-							pendingCheckpoints.remove(checkpointID);
-							rememberRecentCheckpointId(checkpointID);
+						checkpoint.abortExpired();
+						pendingCheckpoints.remove(checkpointID);
+						rememberRecentCheckpointId(checkpointID);
 
-							triggerQueuedRequests();
-						}
+						triggerQueuedRequests();
 					}
 				}
 			};
@@ -675,7 +637,7 @@ public class CheckpointCoordinator {
 				if (!props.isSavepoint()) {
 					checkpointOptions = CheckpointOptions.forCheckpoint();
 				} else {
-					checkpointOptions = CheckpointOptions.forSavepoint(targetDirectory);
+					checkpointOptions = CheckpointOptions.forSavepoint(checkpointStorageLocation.getLocationAsPointer());
 				}
 
 				// send the messages to the tasks that trigger their checkpoint
@@ -699,6 +661,14 @@ public class CheckpointCoordinator {
 				if (!checkpoint.isDiscarded()) {
 					checkpoint.abortError(new Exception("Failed to trigger checkpoint", t));
 				}
+
+				try {
+					checkpointStorageLocation.disposeOnFailure();
+				}
+				catch (Throwable t2) {
+					LOG.warn("Cannot dispose failed checkpoint storage location {}", checkpointStorageLocation, t2);
+				}
+
 				return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
 			}
 
@@ -718,7 +688,7 @@ public class CheckpointCoordinator {
 			throw new IllegalArgumentException("Received DeclineCheckpoint message for job " +
 				message.getJob() + " while this coordinator handles job " + job);
 		}
-		
+
 		final long checkpointId = message.getCheckpointId();
 		final String reason = (message.getReason() != null ? message.getReason().getMessage() : "");
 
@@ -778,7 +748,7 @@ public class CheckpointCoordinator {
 		}
 
 		final long checkpointId = message.getCheckpointId();
-		
+
 		synchronized (lock) {
 			// we need to check inside the lock for being shutdown as well, otherwise we
 			// get races and invalid error log messages
@@ -854,7 +824,7 @@ public class CheckpointCoordinator {
 	/**
 	 * Try to complete the given pending checkpoint.
 	 *
-	 * Important: This method should only be called in the checkpoint lock scope.
+	 * <p>Important: This method should only be called in the checkpoint lock scope.
 	 *
 	 * @param pendingCheckpoint to complete
 	 * @throws CheckpointException if the completion failed
@@ -869,13 +839,9 @@ public class CheckpointCoordinator {
 
 		try {
 			try {
-				// externalize the checkpoint if required
-				if (pendingCheckpoint.getProps().externalizeCheckpoint()) {
-					completedCheckpoint = pendingCheckpoint.finalizeCheckpointExternalized();
-				} else {
-					completedCheckpoint = pendingCheckpoint.finalizeCheckpointNonExternalized();
-				}
-			} catch (Exception e1) {
+				completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
+			}
+			catch (Exception e1) {
 				// abort the current pending checkpoint if we fails to finalize the pending checkpoint.
 				if (!pendingCheckpoint.isDiscarded()) {
 					pendingCheckpoint.abortError(e1);
@@ -1122,38 +1088,40 @@ public class CheckpointCoordinator {
 	}
 
 	/**
-	 * Restore the state with given savepoint
-	 * 
-	 * @param savepointPath    Location of the savepoint
-	 * @param allowNonRestored True if allowing checkpoint state that cannot be 
+	 * Restore the state with given savepoint.
+	 *
+	 * @param savepointPointer The pointer to the savepoint.
+	 * @param allowNonRestored True if allowing checkpoint state that cannot be
 	 *                         mapped to any job vertex in tasks.
-	 * @param tasks            Map of job vertices to restore. State for these 
-	 *                         vertices is restored via 
+	 * @param tasks            Map of job vertices to restore. State for these
+	 *                         vertices is restored via
 	 *                         {@link Execution#setInitialState(TaskStateSnapshot)}.
-	 * @param userClassLoader  The class loader to resolve serialized classes in 
-	 *                         legacy savepoint versions. 
+	 * @param userClassLoader  The class loader to resolve serialized classes in
+	 *                         legacy savepoint versions.
 	 */
 	public boolean restoreSavepoint(
-			String savepointPath, 
+			String savepointPointer,
 			boolean allowNonRestored,
 			Map<JobVertexID, ExecutionJobVertex> tasks,
 			ClassLoader userClassLoader) throws Exception {
-		
-		Preconditions.checkNotNull(savepointPath, "The savepoint path cannot be null.");
-		
-		LOG.info("Starting job from savepoint {} ({})", 
-				savepointPath, (allowNonRestored ? "allowing non restored state" : ""));
+
+		Preconditions.checkNotNull(savepointPointer, "The savepoint path cannot be null.");
+
+		LOG.info("Starting job from savepoint {} ({})",
+				savepointPointer, (allowNonRestored ? "allowing non restored state" : ""));
+
+		final StreamStateHandle metadataHandle = checkpointStorage.resolveCheckpoint(savepointPointer);
 
 		// Load the savepoint as a checkpoint into the system
-		CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint(
-				job, tasks, savepointPath, userClassLoader, allowNonRestored);
+		CompletedCheckpoint savepoint = Checkpoints.loadAndValidateCheckpoint(
+				job, tasks, savepointPointer, metadataHandle, userClassLoader, allowNonRestored);
 
 		completedCheckpointStore.addCheckpoint(savepoint);
-		
+
 		// Reset the checkpoint ID counter
 		long nextCheckpointId = savepoint.getCheckpointID() + 1;
 		checkpointIdCounter.setCount(nextCheckpointId);
-		
+
 		LOG.info("Reset the checkpoint ID to {}.", nextCheckpointId);
 
 		return restoreLatestCheckpointedState(tasks, true, allowNonRestored);
@@ -1185,6 +1153,10 @@ public class CheckpointCoordinator {
 		}
 	}
 
+	public CheckpointStorage getCheckpointStorage() {
+		return checkpointStorage;
+	}
+
 	public CompletedCheckpointStore getCheckpointStore() {
 		return completedCheckpointStore;
 	}
@@ -1221,7 +1193,7 @@ public class CheckpointCoordinator {
 
 			periodicScheduling = true;
 			currentPeriodicTrigger = timer.scheduleAtFixedRate(
-					new ScheduledTrigger(), 
+					new ScheduledTrigger(),
 					baseInterval, baseInterval, TimeUnit.MILLISECONDS);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index 1233b6e..8d6346c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -34,11 +34,10 @@ import java.io.Serializable;
  */
 public class CheckpointProperties implements Serializable {
 
-	private static final long serialVersionUID = -8835900655844879469L;
+	private static final long serialVersionUID = -8835900655844879470L;
 
 	private final boolean forced;
 
-	private final boolean externalize;
 	private final boolean savepoint;
 
 	private final boolean discardSubsumed;
@@ -49,7 +48,6 @@ public class CheckpointProperties implements Serializable {
 
 	CheckpointProperties(
 			boolean forced,
-			boolean externalize,
 			boolean savepoint,
 			boolean discardSubsumed,
 			boolean discardFinished,
@@ -58,20 +56,12 @@ public class CheckpointProperties implements Serializable {
 			boolean discardSuspended) {
 
 		this.forced = forced;
-		this.externalize = externalize;
 		this.savepoint = savepoint;
 		this.discardSubsumed = discardSubsumed;
 		this.discardFinished = discardFinished;
 		this.discardCancelled = discardCancelled;
 		this.discardFailed = discardFailed;
 		this.discardSuspended = discardSuspended;
-
-		// Not persisted, but needs manual clean up
-		if (!externalize && !(discardSubsumed && discardFinished && discardCancelled
-				&& discardFailed && discardSuspended)) {
-			throw new IllegalStateException("CheckpointProperties say to *not* persist the " +
-					"checkpoint, but the checkpoint requires manual cleanup.");
-		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -93,18 +83,6 @@ public class CheckpointProperties implements Serializable {
 		return forced;
 	}
 
-	/**
-	 * Returns whether the checkpoint should be persisted externally.
-	 *
-	 * @return <code>true</code> if the checkpoint should be persisted
-	 * externally; <code>false</code> otherwise.
-	 *
-	 * @see PendingCheckpoint
-	 */
-	boolean externalizeCheckpoint() {
-		return externalize;
-	}
-
 	// ------------------------------------------------------------------------
 	// Garbage collection behaviour
 	// ------------------------------------------------------------------------
@@ -203,7 +181,6 @@ public class CheckpointProperties implements Serializable {
 
 		CheckpointProperties that = (CheckpointProperties) o;
 		return forced == that.forced &&
-				externalize == that.externalize &&
 				savepoint == that.savepoint &&
 				discardSubsumed == that.discardSubsumed &&
 				discardFinished == that.discardFinished &&
@@ -215,7 +192,6 @@ public class CheckpointProperties implements Serializable {
 	@Override
 	public int hashCode() {
 		int result = (forced ? 1 : 0);
-		result = 31 * result + (externalize ? 1 : 0);
 		result = 31 * result + (savepoint ? 1 : 0);
 		result = 31 * result + (discardSubsumed ? 1 : 0);
 		result = 31 * result + (discardFinished ? 1 : 0);
@@ -229,7 +205,6 @@ public class CheckpointProperties implements Serializable {
 	public String toString() {
 		return "CheckpointProperties{" +
 				"forced=" + forced +
-				", externalized=" + externalizeCheckpoint() +
 				", savepoint=" + savepoint +
 				", discardSubsumed=" + discardSubsumed +
 				", discardFinished=" + discardFinished +
@@ -241,8 +216,7 @@ public class CheckpointProperties implements Serializable {
 
 	// ------------------------------------------------------------------------
 
-	private static final CheckpointProperties STANDARD_SAVEPOINT = new CheckpointProperties(
-			true,
+	private static final CheckpointProperties SAVEPOINT = new CheckpointProperties(
 			true,
 			true,
 			false,
@@ -251,77 +225,66 @@ public class CheckpointProperties implements Serializable {
 			false,
 			false);
 
-	private static final CheckpointProperties STANDARD_CHECKPOINT = new CheckpointProperties(
-			false,
+	private static final CheckpointProperties CHECKPOINT_NEVER_RETAINED = new CheckpointProperties(
 			false,
 			false,
 			true,
-			true,
-			true,
-			true,
-			true);
+			true,  // Delete on success
+			true,  // Delete on cancellation
+			true,  // Delete on failure
+			true); // Delete on suspension
 
-	private static final CheckpointProperties EXTERNALIZED_CHECKPOINT_RETAINED = new CheckpointProperties(
+	private static final CheckpointProperties CHECKPOINT_RETAINED_ON_FAILURE = new CheckpointProperties(
 			false,
-			true,
 			false,
 			true,
-			true,
-			false, // Retain on cancellation
-			false,
-			false); // Retain on suspension
+			true,  // Delete on success
+			true,  // Delete on cancellation
+			false, // Retain on failure
+			true); // Delete on suspension
 
-	private static final CheckpointProperties EXTERNALIZED_CHECKPOINT_DELETED = new CheckpointProperties(
+	private static final CheckpointProperties CHECKPOINT_RETAINED_ON_CANCELLATION = new CheckpointProperties(
 			false,
-			true,
 			false,
 			true,
-			true,
-			true, // Delete on cancellation
-			false,
-			true); // Delete on suspension
+			true,   // Delete on success
+			false,  // Retain on cancellation
+			false,  // Retain on failure
+			false); // Retain on suspension
+
 
 	/**
 	 * Creates the checkpoint properties for a (manually triggered) savepoint.
 	 *
-	 * <p>Savepoints are forced and persisted externally. They have to be
+	 * <p>Savepoints are not queued due to time trigger limits. They have to be
 	 * garbage collected manually.
 	 *
 	 * @return Checkpoint properties for a (manually triggered) savepoint.
 	 */
-	public static CheckpointProperties forStandardSavepoint() {
-		return STANDARD_SAVEPOINT;
-	}
-
-	/**
-	 * Creates the checkpoint properties for a regular checkpoint.
-	 *
-	 * <p>Regular checkpoints are not forced and not persisted externally. They
-	 * are garbage collected automatically.
-	 *
-	 * @return Checkpoint properties for a regular checkpoint.
-	 */
-	public static CheckpointProperties forStandardCheckpoint() {
-		return STANDARD_CHECKPOINT;
+	public static CheckpointProperties forSavepoint() {
+		return SAVEPOINT;
 	}
 
 	/**
-	 * Creates the checkpoint properties for an external checkpoint.
+	 * Creates the checkpoint properties for a checkpoint.
 	 *
-	 * <p>External checkpoints are not forced, but persisted externally. They
-	 * are garbage collected automatically, except when the owning job
+	 * <p>Checkpoints may be queued in case too many other checkpoints are currently happening.
+	 * They are garbage collected automatically, except when the owning job
 	 * terminates in state {@link JobStatus#FAILED}. The user is required to
 	 * configure the clean up behaviour on job cancellation.
 	 *
-	 * @param deleteOnCancellation Flag indicating whether to discard on cancellation.
-	 *
 	 * @return Checkpoint properties for an external checkpoint.
 	 */
-	public static CheckpointProperties forExternalizedCheckpoint(boolean deleteOnCancellation) {
-		if (deleteOnCancellation) {
-			return EXTERNALIZED_CHECKPOINT_DELETED;
-		} else {
-			return EXTERNALIZED_CHECKPOINT_RETAINED;
+	public static CheckpointProperties forCheckpoint(CheckpointRetentionPolicy policy) {
+		switch (policy) {
+			case NEVER_RETAIN_AFTER_TERMINATION:
+				return CHECKPOINT_NEVER_RETAINED;
+			case RETAIN_ON_FAILURE:
+				return CHECKPOINT_RETAINED_ON_FAILURE;
+			case RETAIN_ON_CANCELLATION:
+				return CHECKPOINT_RETAINED_ON_CANCELLATION;
+			default:
+				throw new IllegalArgumentException("unknown policy: " + policy);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRetentionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRetentionPolicy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRetentionPolicy.java
new file mode 100644
index 0000000..3bd124d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRetentionPolicy.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Policy for whether checkpoints are retained after a job terminates.
+ */
+@Internal
+public enum CheckpointRetentionPolicy {
+
+	/** Checkpoints should be retained on cancellation and failure. */
+	RETAIN_ON_CANCELLATION,
+
+	/** Checkpoints should be retained on failure, but not on cancellation. */
+	RETAIN_ON_FAILURE,
+
+	/** Checkpoints should always be cleaned up when an application reaches a terminal state. */
+	NEVER_RETAIN_AFTER_TERMINATION;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
new file mode 100644
index 0000000..bfa7d45
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class with the methods to write/load/dispose the checkpoint and savepoint metadata.
+ *
+ * <p>Stored checkpoint metadata files have the following format:
+ * <pre>[MagicNumber (int) | Format Version (int) | Checkpoint Metadata (variable)]</pre>
+ *
+ * <p>The actual savepoint serialization is version-specific via the {@link SavepointSerializer}.
+ */
+public class Checkpoints {
+
+	private static final Logger LOG = LoggerFactory.getLogger(Checkpoints.class);
+
+	/** Magic number at the beginning of every checkpoint metadata file, for sanity checks. */
+	public static final int HEADER_MAGIC_NUMBER = 0x4960672d;
+
+	// ------------------------------------------------------------------------
+	//  Writing out checkpoint metadata
+	// ------------------------------------------------------------------------
+
+	public static <T extends Savepoint> void storeCheckpointMetadata(
+			T checkpointMetadata,
+			OutputStream out) throws IOException {
+
+		DataOutputStream dos = new DataOutputStream(out);
+		storeCheckpointMetadata(checkpointMetadata, dos);
+	}
+
+	public static <T extends Savepoint> void storeCheckpointMetadata(
+			T checkpointMetadata,
+			DataOutputStream out) throws IOException {
+
+		// write generic header
+		out.writeInt(HEADER_MAGIC_NUMBER);
+		out.writeInt(checkpointMetadata.getVersion());
+
+		// write checkpoint metadata
+		SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(checkpointMetadata);
+		serializer.serialize(checkpointMetadata, out);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Reading and validating checkpoint metadata
+	// ------------------------------------------------------------------------
+
+	public static Savepoint loadCheckpointMetadata(DataInputStream in, ClassLoader classLoader) throws IOException {
+		checkNotNull(in, "input stream");
+		checkNotNull(classLoader, "classLoader");
+
+		final int magicNumber = in.readInt();
+
+		if (magicNumber == HEADER_MAGIC_NUMBER) {
+			final int version = in.readInt();
+			final SavepointSerializer<?> serializer = SavepointSerializers.getSerializer(version);
+
+			if (serializer != null) {
+				return serializer.deserialize(in, classLoader);
+			}
+			else {
+				throw new IOException("Unrecognized checkpoint version number: " + version);
+			}
+		}
+		else {
+			throw new IOException("Unexpected magic number. This can have multiple reasons: " +
+					"(1) You are trying to load a Flink 1.0 savepoint, which is not supported by this " +
+					"version of Flink. (2) The file you were pointing to is not a savepoint at all. " +
+					"(3) The savepoint file has been corrupted.");
+		}
+	}
+
+	@SuppressWarnings("deprecation")
+	public static CompletedCheckpoint loadAndValidateCheckpoint(
+			JobID jobId,
+			Map<JobVertexID, ExecutionJobVertex> tasks,
+			String checkpointPointer,
+			StreamStateHandle metadataHandle,
+			ClassLoader classLoader,
+			boolean allowNonRestoredState) throws IOException {
+
+		checkNotNull(jobId, "jobId");
+		checkNotNull(tasks, "tasks");
+		checkNotNull(checkpointPointer, "checkpointPointer");
+		checkNotNull(metadataHandle, "metadataHandle");
+		checkNotNull(classLoader, "classLoader");
+
+		// (1) load the savepoint
+		final Savepoint rawCheckpointMetadata;
+		try (FSDataInputStream in = metadataHandle.openInputStream()) {
+			DataInputStream dis = new DataInputStream(in);
+			rawCheckpointMetadata = loadCheckpointMetadata(dis, classLoader);
+		}
+
+		final Savepoint checkpointMetadata = rawCheckpointMetadata.getTaskStates() == null ?
+				rawCheckpointMetadata :
+				SavepointV2.convertToOperatorStateSavepointV2(tasks, rawCheckpointMetadata);
+
+		// generate mapping from operator to task
+		Map<OperatorID, ExecutionJobVertex> operatorToJobVertexMapping = new HashMap<>();
+		for (ExecutionJobVertex task : tasks.values()) {
+			for (OperatorID operatorID : task.getOperatorIDs()) {
+				operatorToJobVertexMapping.put(operatorID, task);
+			}
+		}
+
+		// (2) validate it (parallelism, etc)
+		boolean expandedToLegacyIds = false;
+
+		HashMap<OperatorID, OperatorState> operatorStates = new HashMap<>(checkpointMetadata.getOperatorStates().size());
+		for (OperatorState operatorState : checkpointMetadata.getOperatorStates()) {
+
+			ExecutionJobVertex executionJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID());
+
+			// on the first time we can not find the execution job vertex for an id, we also consider alternative ids,
+			// for example as generated from older flink versions, to provide backwards compatibility.
+			if (executionJobVertex == null && !expandedToLegacyIds) {
+				operatorToJobVertexMapping = ExecutionJobVertex.includeAlternativeOperatorIDs(operatorToJobVertexMapping);
+				executionJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID());
+				expandedToLegacyIds = true;
+				LOG.info("Could not find ExecutionJobVertex. Including user-defined OperatorIDs in search.");
+			}
+
+			if (executionJobVertex != null) {
+
+				if (executionJobVertex.getMaxParallelism() == operatorState.getMaxParallelism()
+						|| !executionJobVertex.isMaxParallelismConfigured()) {
+					operatorStates.put(operatorState.getOperatorID(), operatorState);
+				} else {
+					String msg = String.format("Failed to rollback to checkpoint/savepoint %s. " +
+									"Max parallelism mismatch between checkpoint/savepoint state and new program. " +
+									"Cannot map operator %s with max parallelism %d to new program with " +
+									"max parallelism %d. This indicates that the program has been changed " +
+									"in a non-compatible way after the checkpoint/savepoint.",
+							checkpointMetadata,
+							operatorState.getOperatorID(),
+							operatorState.getMaxParallelism(),
+							executionJobVertex.getMaxParallelism());
+
+					throw new IllegalStateException(msg);
+				}
+			} else if (allowNonRestoredState) {
+				LOG.info("Skipping savepoint state for operator {}.", operatorState.getOperatorID());
+			} else {
+				for (OperatorSubtaskState operatorSubtaskState : operatorState.getStates()) {
+					if (operatorSubtaskState.hasState()) {
+						String msg = String.format("Failed to rollback to checkpoint/savepoint %s. " +
+										"Cannot map checkpoint/savepoint state for operator %s to the new program, " +
+										"because the operator is not available in the new program. If " +
+										"you want to allow to skip this, you can set the --allowNonRestoredState " +
+										"option on the CLI.",
+								checkpointPointer, operatorState.getOperatorID());
+
+						throw new IllegalStateException(msg);
+					}
+				}
+
+				LOG.info("Skipping empty savepoint state for operator {}.", operatorState.getOperatorID());
+			}
+		}
+
+		// (3) convert to checkpoint so the system can fall back to it
+		CheckpointProperties props = CheckpointProperties.forSavepoint();
+
+		return new CompletedCheckpoint(
+				jobId,
+				checkpointMetadata.getCheckpointId(),
+				0L,
+				0L,
+				operatorStates,
+				checkpointMetadata.getMasterStates(),
+				props,
+				metadataHandle,
+				checkpointPointer);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Savepoint Disposal Hooks
+	// ------------------------------------------------------------------------
+
+	public static void disposeSavepoint(
+			String pointer,
+			StateBackend stateBackend,
+			ClassLoader classLoader) throws IOException, FlinkException {
+
+		checkNotNull(pointer, "location");
+		checkNotNull(stateBackend, "stateBackend");
+		checkNotNull(classLoader, "classLoader");
+
+		final StreamStateHandle metadataHandle = stateBackend.resolveCheckpoint(pointer);
+
+		// load the savepoint object (the metadata) to have all the state handles that we need
+		// to dispose of all state
+		final Savepoint savepoint;
+		try (FSDataInputStream in = metadataHandle.openInputStream();
+			DataInputStream dis = new DataInputStream(in)) {
+
+				savepoint = loadCheckpointMetadata(dis, classLoader);
+		}
+
+		Exception exception = null;
+
+		// first dispose the savepoint metadata, so that the savepoint is not
+		// addressable any more even if the following disposal fails
+		try {
+			metadataHandle.discardState();
+		}
+		catch (Exception e) {
+			exception = e;
+		}
+
+		// now dispose the savepoint data
+		try {
+			savepoint.dispose();
+		}
+		catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(e, exception);
+		}
+
+		// until we have the proper hooks to delete full directories via the checkpoint storage,
+		// we need to have a special case here to remove the empty directory
+		if (stateBackend instanceof AbstractFileStateBackend && metadataHandle instanceof FileStateHandle) {
+			Path dir = ((FileStateHandle) metadataHandle).getFilePath().getParent();
+			FileUtils.deletePathIfEmpty(dir.getFileSystem(), dir);
+		}
+
+		if (exception != null) {
+			ExceptionUtils.rethrowIOException(exception);
+		}
+	}
+
+	public static void disposeSavepoint(
+			String pointer,
+			Configuration configuration,
+			ClassLoader classLoader,
+			@Nullable Logger logger) throws IOException, FlinkException {
+
+		checkNotNull(pointer, "location");
+		checkNotNull(configuration, "configuration");
+		checkNotNull(classLoader, "classLoader");
+
+		if (logger != null) {
+			logger.info("Attempting to load configured state backend for savepoint disposal");
+		}
+
+		StateBackend backend = null;
+		try {
+			backend = StateBackendLoader.loadStateBackendFromConfig(configuration, classLoader, null);
+
+			if (backend == null && logger != null) {
+				logger.info("No state backend configured, attempting to dispose savepoint " +
+						"with default backend (file system based)");
+			}
+		}
+		catch (Throwable t) {
+			// catches exceptions and errors (like linking errors)
+			if (logger != null) {
+				logger.info("Could not load configured state backend.");
+				logger.debug("Detailed exception:", t);
+			}
+		}
+
+		if (backend == null) {
+			// We use the memory state backend by default. The MemoryStateBackend is actually
+			// FileSystem-based for metadata
+			backend = new MemoryStateBackend();
+		}
+
+		disposeSavepoint(pointer, backend, classLoader);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** This class contains only static utility methods and is not meant to be instantiated. */
+	private Checkpoints() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index d6d0827..801232c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -25,10 +25,12 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.ExceptionUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -44,32 +46,22 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * and that is considered successful. The CompletedCheckpoint class contains all the metadata of the
  * checkpoint, i.e., checkpoint ID, timestamps, and the handles to all states that are part of the
  * checkpoint.
- * 
+ *
  * <h2>Size the CompletedCheckpoint Instances</h2>
- * 
- * In most cases, the CompletedCheckpoint objects are very small, because the handles to the checkpoint
+ *
+ * <p>In most cases, the CompletedCheckpoint objects are very small, because the handles to the checkpoint
  * states are only pointers (such as file paths). However, the some state backend implementations may
  * choose to store some payload data directly with the metadata (for example to avoid many small files).
  * If those thresholds are increased to large values, the memory consumption of the CompletedCheckpoint
  * objects can be significant.
- * 
- * <h2>Externalized Metadata</h2>
- * 
- * The metadata of the CompletedCheckpoint is optionally also persisted in an external storage
- * system. In that case, the checkpoint is called <i>externalized</i>.
- * 
- * <p>Externalized checkpoints have an external pointer, which points to the metadata. For example
- * when externalizing to a file system, that pointer is the file path to the checkpoint's folder
+ *
+ * <h2>Metadata Persistence</h2>
+ *
+ * <p>The metadata of the CompletedCheckpoint is also persisted in an external storage
+ * system. Checkpoints have an external pointer, which points to the metadata. For example
+ * when storing a checkpoint in a file system, that pointer is the file path to the checkpoint's folder
  * or the metadata file. For a state backend that stores metadata in database tables, the pointer
  * could be the table name and row key. The pointer is encoded as a String.
- * 
- * <h2>Externalized Metadata and High-availability</h2>
- * 
- * For high availability setups, the checkpoint metadata must be stored persistent and available
- * as well. The high-availability services that stores the checkpoint ground-truth (meaning what are
- * the latest completed checkpoints in what order) often rely on checkpoints being externalized. That
- * way, those services only store pointers to the externalized metadata, rather than the complete
- * metadata itself (for example ZooKeeper's ZNode payload should ideally be less than megabytes).
  */
 public class CompletedCheckpoint implements Serializable {
 
@@ -79,10 +71,10 @@ public class CompletedCheckpoint implements Serializable {
 
 	// ------------------------------------------------------------------------
 
-	/** The ID of the job that the checkpoint belongs to */
+	/** The ID of the job that the checkpoint belongs to. */
 	private final JobID job;
 
-	/** The ID (logical timestamp) of the checkpoint */
+	/** The ID (logical timestamp) of the checkpoint. */
 	private final long checkpointID;
 
 	/** The timestamp when the checkpoint was triggered. */
@@ -91,21 +83,19 @@ public class CompletedCheckpoint implements Serializable {
 	/** The duration of the checkpoint (completion timestamp - trigger timestamp). */
 	private final long duration;
 
-	/** States of the different operator groups belonging to this checkpoint */
+	/** States of the different operator groups belonging to this checkpoint. */
 	private final Map<OperatorID, OperatorState> operatorStates;
 
 	/** Properties for this checkpoint. */
 	private final CheckpointProperties props;
 
-	/** States that were created by a hook on the master (in the checkpoint coordinator) */
+	/** States that were created by a hook on the master (in the checkpoint coordinator). */
 	private final Collection<MasterState> masterHookStates;
 
-	/** The state handle to the externalized meta data, if the metadata has been externalized */
-	@Nullable
-	private final StreamStateHandle externalizedMetadata;
+	/** The state handle to the externalized meta data. */
+	private final StreamStateHandle metadataHandle;
 
-	/** External pointer to the completed checkpoint (for example file path) if externalized; null otherwise. */
-	@Nullable
+	/** External pointer to the completed checkpoint (for example file path). */
 	private final String externalPointer;
 
 	/** Optional stats tracker callback for discard. */
@@ -122,19 +112,13 @@ public class CompletedCheckpoint implements Serializable {
 			Map<OperatorID, OperatorState> operatorStates,
 			@Nullable Collection<MasterState> masterHookStates,
 			CheckpointProperties props,
-			@Nullable StreamStateHandle externalizedMetadata,
-			@Nullable String externalPointer) {
+			StreamStateHandle metadataHandle,
+			String externalPointer) {
 
 		checkArgument(checkpointID >= 0);
 		checkArgument(timestamp >= 0);
 		checkArgument(completionTimestamp >= 0);
 
-		checkArgument((externalPointer == null) == (externalizedMetadata == null),
-				"external pointer without externalized metadata must be both null or both non-null");
-
-		checkArgument(!props.externalizeCheckpoint() || externalPointer != null,
-			"Checkpoint properties require externalized checkpoint, but checkpoint is not externalized");
-
 		this.job = checkNotNull(job);
 		this.checkpointID = checkpointID;
 		this.timestamp = timestamp;
@@ -148,8 +132,8 @@ public class CompletedCheckpoint implements Serializable {
 				new ArrayList<>(masterHookStates);
 
 		this.props = checkNotNull(props);
-		this.externalizedMetadata = externalizedMetadata;
-		this.externalPointer = externalPointer;
+		this.metadataHandle = checkNotNull(metadataHandle);
+		this.externalPointer = checkNotNull(externalPointer);
 	}
 
 	// ------------------------------------------------------------------------
@@ -216,12 +200,10 @@ public class CompletedCheckpoint implements Serializable {
 			Exception exception = null;
 
 			// drop the metadata, if we have some
-			if (externalizedMetadata != null) {
-				try {
-					externalizedMetadata.discardState();
-				} catch (Exception e) {
-					exception = e;
-				}
+			try {
+				metadataHandle.discardState();
+			} catch (Exception e) {
+				exception = e;
 			}
 
 			// discard private state objects
@@ -263,16 +245,10 @@ public class CompletedCheckpoint implements Serializable {
 		return Collections.unmodifiableCollection(masterHookStates);
 	}
 
-	public boolean isExternalized() {
-		return externalizedMetadata != null;
+	public StreamStateHandle getMetadataHandle() {
+		return metadataHandle;
 	}
 
-	@Nullable
-	public StreamStateHandle getExternalizedMetadata() {
-		return externalizedMetadata;
-	}
-
-	@Nullable
 	public String getExternalPointer() {
 		return externalPointer;
 	}
@@ -314,10 +290,7 @@ public class CompletedCheckpoint implements Serializable {
 
 		CompletedCheckpoint that = (CompletedCheckpoint) o;
 
-		if (checkpointID != that.checkpointID) {
-			return false;
-		}
-		return job.equals(that.job);
+		return checkpointID == that.checkpointID && job.equals(that.job);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index a9b6d4d..5e8559d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -20,14 +20,14 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -35,7 +35,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -56,7 +55,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * A pending checkpoint is a checkpoint that has been started, but has not been
  * acknowledged by all tasks that need to acknowledge it. Once all tasks have
  * acknowledged it, it becomes a {@link CompletedCheckpoint}.
- * 
+ *
  * <p>Note that the pending checkpoint, as well as the successful checkpoint keep the
  * state handles always as serialized values, never as actual values.
  */
@@ -74,7 +73,7 @@ public class PendingCheckpoint {
 
 	// ------------------------------------------------------------------------
 
-	/** The PendingCheckpoint logs to the same logger as the CheckpointCoordinator */
+	/** The PendingCheckpoint logs to the same logger as the CheckpointCoordinator. */
 	private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
 
 	private final Object lock = new Object();
@@ -91,20 +90,19 @@ public class PendingCheckpoint {
 
 	private final List<MasterState> masterState;
 
-	/** Set of acknowledged tasks */
+	/** Set of acknowledged tasks. */
 	private final Set<ExecutionAttemptID> acknowledgedTasks;
 
-	/** The checkpoint properties. If the checkpoint should be persisted
-	 * externally, it happens in {@link #finalizeCheckpointExternalized()}. */
+	/** The checkpoint properties. */
 	private final CheckpointProperties props;
 
-	/** Target directory to potentially persist checkpoint to; <code>null</code> if none configured. */
-	private final String targetDirectory;
+	/** Target storage location to persist the checkpoint metadata to. */
+	private final CheckpointStorageLocation targetLocation;
 
 	/** The promise to fulfill once the checkpoint has been completed. */
 	private final CompletableFuture<CompletedCheckpoint> onCompletionPromise;
 
-	/** The executor for potentially blocking I/O operations, like state disposal */
+	/** The executor for potentially blocking I/O operations, like state disposal. */
 	private final Executor executor;
 
 	private int numAcknowledgedTasks;
@@ -125,14 +123,9 @@ public class PendingCheckpoint {
 			long checkpointTimestamp,
 			Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm,
 			CheckpointProperties props,
-			String targetDirectory,
+			CheckpointStorageLocation targetLocation,
 			Executor executor) {
 
-		// Sanity check
-		if (props.externalizeCheckpoint() && targetDirectory == null) {
-			throw new NullPointerException("No target directory specified to persist checkpoint to.");
-		}
-
 		checkArgument(verticesToConfirm.size() > 0,
 				"Checkpoint needs at least one vertex that commits the checkpoint");
 
@@ -141,7 +134,7 @@ public class PendingCheckpoint {
 		this.checkpointTimestamp = checkpointTimestamp;
 		this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm);
 		this.props = checkNotNull(props);
-		this.targetDirectory = targetDirectory;
+		this.targetLocation = checkNotNull(targetLocation);
 		this.executor = Preconditions.checkNotNull(executor);
 
 		this.operatorStates = new HashMap<>();
@@ -195,7 +188,7 @@ public class PendingCheckpoint {
 	/**
 	 * Checks whether this checkpoint can be subsumed or whether it should always continue, regardless
 	 * of newer checkpoints in progress.
-	 * 
+	 *
 	 * @return True if the checkpoint can be subsumed, false otherwise.
 	 */
 	public boolean canBeSubsumed() {
@@ -207,10 +200,6 @@ public class PendingCheckpoint {
 		return props;
 	}
 
-	String getTargetDirectory() {
-		return targetDirectory;
-	}
-
 	/**
 	 * Sets the callback for tracking this pending checkpoint.
 	 *
@@ -223,7 +212,7 @@ public class PendingCheckpoint {
 	/**
 	 * Sets the handle for the canceller to this pending checkpoint. This method fails
 	 * with an exception if a handle has already been set.
-	 * 
+	 *
 	 * @return true, if the handle was set, false, if the checkpoint is already disposed;
 	 */
 	public boolean setCancellerHandle(ScheduledFuture<?> cancellerHandle) {
@@ -255,103 +244,60 @@ public class PendingCheckpoint {
 		return onCompletionPromise;
 	}
 
-	public CompletedCheckpoint finalizeCheckpointExternalized() throws IOException {
+	public CompletedCheckpoint finalizeCheckpoint() throws IOException {
 
 		synchronized (lock) {
 			checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
 
 			// make sure we fulfill the promise with an exception if something fails
 			try {
-				// externalize the metadata
+				// write out the metadata
 				final Savepoint savepoint = new SavepointV2(checkpointId, operatorStates.values(), masterState);
+				final StreamStateHandle metadataHandle;
 
-				// TEMP FIX - The savepoint store is strictly typed to file systems currently
-				//            but the checkpoints think more generic. we need to work with file handles
-				//            here until the savepoint serializer accepts a generic stream factory
-
-				// We have this branch here, because savepoints and externalized checkpoints
-				// currently behave differently.
-				// Savepoints:
-				//   - Metadata file in unique directory
-				//   - External pointer can be the directory
-				// Externalized checkpoints:
-				//   - Multiple metadata files per directory possible (need to be unique)
-				//   - External pointer needs to be the file itself
-				//
-				// This should be unified as part of the JobManager metadata stream factories.
-				if (props.isSavepoint()) {
-					final FileStateHandle metadataHandle = SavepointStore.storeSavepointToHandle(targetDirectory, savepoint);
-					final String externalPointer = metadataHandle.getFilePath().getParent().toString();
-	
-					return finalizeInternal(metadataHandle, externalPointer);
-				} else {
-					final FileStateHandle metadataHandle = SavepointStore.storeExternalizedCheckpointToHandle(targetDirectory, savepoint);
-					final String externalPointer = metadataHandle.getFilePath().toString();
+				try (CheckpointStateOutputStream out = targetLocation.createMetadataOutputStream()) {
+					Checkpoints.storeCheckpointMetadata(savepoint, out);
+					metadataHandle = out.closeAndGetHandle();
+				}
 
-					return finalizeInternal(metadataHandle, externalPointer);
+				final String externalPointer = targetLocation.markCheckpointAsFinished();
+
+				CompletedCheckpoint completed = new CompletedCheckpoint(
+						jobId,
+						checkpointId,
+						checkpointTimestamp,
+						System.currentTimeMillis(),
+						operatorStates,
+						masterState,
+						props,
+						metadataHandle,
+						externalPointer);
+
+				onCompletionPromise.complete(completed);
+
+				// to prevent null-pointers from concurrent modification, copy reference onto stack
+				PendingCheckpointStats statsCallback = this.statsCallback;
+				if (statsCallback != null) {
+					// Finalize the statsCallback and give the completed checkpoint a
+					// callback for discards.
+					CompletedCheckpointStats.DiscardCallback discardCallback =
+							statsCallback.reportCompletedCheckpoint(externalPointer);
+					completed.setDiscardCallback(discardCallback);
 				}
-			}
-			catch (Throwable t) {
-				onCompletionPromise.completeExceptionally(t);
-				ExceptionUtils.rethrowIOException(t);
-				return null; // silence the compiler
-			}
-		}
-	}
 
-	public CompletedCheckpoint finalizeCheckpointNonExternalized() {
-		synchronized (lock) {
-			checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
+				// mark this pending checkpoint as disposed, but do NOT drop the state
+				dispose(false);
 
-			// make sure we fulfill the promise with an exception if something fails
-			try {
-				// finalize without external metadata
-				return finalizeInternal(null, null);
+				return completed;
 			}
 			catch (Throwable t) {
 				onCompletionPromise.completeExceptionally(t);
-				ExceptionUtils.rethrow(t);
+				ExceptionUtils.rethrowIOException(t);
 				return null; // silence the compiler
 			}
 		}
 	}
 
-	@GuardedBy("lock")
-	private CompletedCheckpoint finalizeInternal(
-			@Nullable StreamStateHandle externalMetadata,
-			@Nullable String externalPointer) {
-
-		assert(Thread.holdsLock(lock));
-
-		CompletedCheckpoint completed = new CompletedCheckpoint(
-				jobId,
-				checkpointId,
-				checkpointTimestamp,
-				System.currentTimeMillis(),
-				operatorStates,
-				masterState,
-				props,
-				externalMetadata,
-				externalPointer);
-
-		onCompletionPromise.complete(completed);
-
-		// to prevent null-pointers from concurrent modification, copy reference onto stack
-		PendingCheckpointStats statsCallback = this.statsCallback;
-		if (statsCallback != null) {
-			// Finalize the statsCallback and give the completed checkpoint a
-			// callback for discards.
-			CompletedCheckpointStats.DiscardCallback discardCallback = 
-					statsCallback.reportCompletedCheckpoint(externalPointer);
-			completed.setDiscardCallback(discardCallback);
-		}
-
-		// mark this pending checkpoint as disposed, but do NOT drop the state
-		dispose(false);
-
-		return completed;
-	}
-
 	/**
 	 * Acknowledges the task with the given execution attempt id and the given subtask state.
 	 *
@@ -528,6 +474,7 @@ public class PendingCheckpoint {
 							// unregistered shared states are still considered private at this point.
 							try {
 								StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
+								targetLocation.disposeOnFailure();
 							} catch (Throwable t) {
 								LOG.warn("Could not properly dispose the private states in the pending checkpoint {} of job {}.",
 									checkpointId, jobId, t);

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
index a7cf4b5..d7966e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
@@ -34,8 +34,7 @@ import java.util.Collection;
  * we allow different savepoint implementations (see subclasses of this
  * interface).
  *
- * <p>Savepoints are serialized via a {@link SavepointSerializer} and stored
- * via a {@link SavepointStore}.
+ * <p>Savepoints are serialized via a {@link SavepointSerializer}.
  */
 public interface Savepoint extends Versioned {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
deleted file mode 100644
index 31d9124..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.checkpoint.CheckpointProperties;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.state.StreamStateHandle;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * The SavepointLoader is a utility to load and verify a Savepoint, and to create a checkpoint from it.
- */
-public class SavepointLoader {
-
-	private static final Logger LOG = LoggerFactory.getLogger(SavepointLoader.class);
-
-	/**
-	 * Loads a savepoint back as a {@link CompletedCheckpoint}.
-	 *
-	 * <p>This method verifies that tasks and parallelism still match the savepoint parameters.
-	 *
-	 * @param jobId          The JobID of the job to load the savepoint for.
-	 * @param tasks          Tasks that will possibly be reset
-	 * @param savepointPath  The path of the savepoint to rollback to
-	 * @param classLoader    The class loader to resolve serialized classes in legacy savepoint versions.
-	 * @param allowNonRestoredState Allow to skip checkpoint state that cannot be mapped
-	 * to any job vertex in tasks.
-	 *
-	 * @throws IllegalStateException If mismatch between program and savepoint state
-	 * @throws IOException             If savepoint store failure
-	 */
-	public static CompletedCheckpoint loadAndValidateSavepoint(
-			JobID jobId,
-			Map<JobVertexID, ExecutionJobVertex> tasks,
-			String savepointPath,
-			ClassLoader classLoader,
-			boolean allowNonRestoredState) throws IOException {
-
-		// (1) load the savepoint
-		final Tuple2<Savepoint, StreamStateHandle> savepointAndHandle = 
-				SavepointStore.loadSavepointWithHandle(savepointPath, classLoader);
-
-		Savepoint savepoint = savepointAndHandle.f0;
-		final StreamStateHandle metadataHandle = savepointAndHandle.f1;
-
-		if (savepoint.getTaskStates() != null) {
-			savepoint = SavepointV2.convertToOperatorStateSavepointV2(tasks, savepoint);
-		}
-		// generate mapping from operator to task
-		Map<OperatorID, ExecutionJobVertex> operatorToJobVertexMapping = new HashMap<>();
-		for (ExecutionJobVertex task : tasks.values()) {
-			for (OperatorID operatorID : task.getOperatorIDs()) {
-				operatorToJobVertexMapping.put(operatorID, task);
-			}
-		}
-
-		// (2) validate it (parallelism, etc)
-		boolean expandedToLegacyIds = false;
-
-		HashMap<OperatorID, OperatorState> operatorStates = new HashMap<>(savepoint.getOperatorStates().size());
-		for (OperatorState operatorState : savepoint.getOperatorStates()) {
-
-			ExecutionJobVertex executionJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID());
-
-			// on the first time we can not find the execution job vertex for an id, we also consider alternative ids,
-			// for example as generated from older flink versions, to provide backwards compatibility.
-			if (executionJobVertex == null && !expandedToLegacyIds) {
-				operatorToJobVertexMapping = ExecutionJobVertex.includeAlternativeOperatorIDs(operatorToJobVertexMapping);
-				executionJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID());
-				expandedToLegacyIds = true;
-				LOG.info("Could not find ExecutionJobVertex. Including user-defined OperatorIDs in search.");
-			}
-
-			if (executionJobVertex != null) {
-
-				if (executionJobVertex.getMaxParallelism() == operatorState.getMaxParallelism()
-						|| !executionJobVertex.isMaxParallelismConfigured()) {
-					operatorStates.put(operatorState.getOperatorID(), operatorState);
-				} else {
-					String msg = String.format("Failed to rollback to savepoint %s. " +
-									"Max parallelism mismatch between savepoint state and new program. " +
-									"Cannot map operator %s with max parallelism %d to new program with " +
-									"max parallelism %d. This indicates that the program has been changed " +
-									"in a non-compatible way after the savepoint.",
-							savepoint,
-							operatorState.getOperatorID(),
-							operatorState.getMaxParallelism(),
-							executionJobVertex.getMaxParallelism());
-
-					throw new IllegalStateException(msg);
-				}
-			} else if (allowNonRestoredState) {
-				LOG.info("Skipping savepoint state for operator {}.", operatorState.getOperatorID());
-			} else {
-				for (OperatorSubtaskState operatorSubtaskState : operatorState.getStates()) {
-					if (operatorSubtaskState.hasState()) {
-						String msg = String.format("Failed to rollback to savepoint %s. " +
-								"Cannot map savepoint state for operator %s to the new program, " +
-								"because the operator is not available in the new program. If " +
-								"you want to allow to skip this, you can set the --allowNonRestoredState " +
-								"option on the CLI.",
-							savepointPath, operatorState.getOperatorID());
-
-						throw new IllegalStateException(msg);
-					}
-				}
-				LOG.info("Skipping empty savepoint state for operator {}.", operatorState.getOperatorID());
-			}
-		}
-
-		// (3) convert to checkpoint so the system can fall back to it
-		CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
-
-		return new CompletedCheckpoint(
-			jobId,
-			savepoint.getCheckpointId(),
-			0L,
-			0L,
-			operatorStates,
-			savepoint.getMasterStates(),
-			props,
-			metadataHandle,
-			savepointPath);
-	}
-
-	// ------------------------------------------------------------------------
-
-	/** This class is not meant to be instantiated */
-	private SavepointLoader() {}
-}