You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/18 17:09:21 UTC

[04/17] flink git commit: [FLINK-5823] [checkpoints] State backends now also handle the checkpoint metadata

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
deleted file mode 100644
index 586df57..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
-import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Utilities for storing and loading savepoint meta data files.
- *
- * <p>Stored savepoints have the following format:
- * <pre>
- * MagicNumber SavepointVersion Savepoint
- *   - MagicNumber => int
- *   - SavepointVersion => int (returned by Savepoint#getVersion())
- *   - Savepoint => bytes (serialized via version-specific SavepointSerializer)
- * </pre>
- */
-public class SavepointStore {
-
-	private static final Logger LOG = LoggerFactory.getLogger(SavepointStore.class);
-
-	/** Magic number for sanity checks against stored savepoints. */
-	public static final int MAGIC_NUMBER = 0x4960672d;
-
-	private static final String SAVEPOINT_METADATA_FILE = "_metadata";
-
-	/**
-	 * Metadata file for an externalized checkpoint, random suffix added
-	 * during store, because the parent directory is not unique.
-	 */
-	static final String EXTERNALIZED_CHECKPOINT_METADATA_FILE = "checkpoint_metadata-";
-
-	/**
-	 * Creates a savepoint directory.
-	 *
-	 * @param baseDirectory Base target directory for the savepoint
-	 * @param jobId Optional JobID the savepoint belongs to
-	 * @return The created savepoint directory
-	 * @throws IOException FileSystem operation failures are forwarded
-	 */
-	public static String createSavepointDirectory(@Nonnull String baseDirectory, @Nullable JobID jobId) throws IOException {
-		final Path basePath = new Path(baseDirectory);
-		final FileSystem fs = basePath.getFileSystem();
-
-		final String prefix;
-		if (jobId == null) {
-			prefix = "savepoint-";
-		} else {
-			prefix = String.format("savepoint-%s-", jobId.toString().substring(0, 6));
-		}
-
-		Exception latestException = null;
-
-		// Try to create a FS output stream
-		for (int attempt = 0; attempt < 10; attempt++) {
-			Path path = new Path(basePath, FileUtils.getRandomFilename(prefix));
-
-			try {
-				if (fs.mkdirs(path)) {
-					return path.toString();
-				}
-			} catch (Exception e) {
-				latestException = e;
-			}
-		}
-
-		throw new IOException("Failed to create savepoint directory at " + baseDirectory, latestException);
-	}
-
-	/**
-	 * Deletes a savepoint directory.
-	 *
-	 * @param savepointDirectory Recursively deletes the given directory
-	 * @throws IOException FileSystem operation failures are forwarded
-	 */
-	public static void deleteSavepointDirectory(@Nonnull String savepointDirectory) throws IOException {
-		Path path = new Path(savepointDirectory);
-		FileSystem fs = FileSystem.get(path.toUri());
-		fs.delete(path, true);
-	}
-
-	/**
-	 * Stores the savepoint metadata file.
-	 *
-	 * @param <T>       Savepoint type
-	 * @param directory Target directory to store savepoint in
-	 * @param savepoint Savepoint to be stored
-	 * @return Path of stored savepoint
-	 * @throws IOException Failures during store are forwarded
-	 */
-	public static <T extends Savepoint> String storeSavepoint(String directory, T savepoint) throws IOException {
-		// write and create the file handle
-		FileStateHandle metadataFileHandle = storeSavepointToHandle(directory,
-			SAVEPOINT_METADATA_FILE, savepoint);
-
-		// we return the savepoint directory path here!
-		// The directory path also works to resume from and is more elegant than the direct
-		// metadata file pointer
-		return metadataFileHandle.getFilePath().getParent().toString();
-	}
-
-	/**
-	 * Stores the savepoint metadata file to a state handle.
-	 *
-	 * @param directory Target directory to store savepoint in
-	 * @param savepoint Savepoint to be stored
-	 *
-	 * @return State handle to the checkpoint metadata
-	 * @throws IOException Failures during store are forwarded
-	 */
-	public static <T extends Savepoint> FileStateHandle storeSavepointToHandle(String directory, T savepoint) throws IOException {
-		return storeSavepointToHandle(directory, SAVEPOINT_METADATA_FILE, savepoint);
-	}
-
-	/**
-	 * Stores the externalized checkpoint metadata file to a state handle.
-	 *
-	 * @param directory Target directory to store savepoint in
-	 * @param savepoint Savepoint to be stored
-	 *
-	 * @return State handle to the checkpoint metadata
-	 * @throws IOException Failures during store are forwarded
-	 */
-	public static <T extends Savepoint> FileStateHandle storeExternalizedCheckpointToHandle(String directory, T savepoint) throws IOException {
-		String fileName = FileUtils.getRandomFilename(EXTERNALIZED_CHECKPOINT_METADATA_FILE);
-		return storeSavepointToHandle(directory, fileName, savepoint);
-	}
-
-	/**
-	 * Stores the savepoint metadata file to a state handle.
-	 *
-	 * @param directory Target directory to store savepoint in
-	 * @param savepoint Savepoint to be stored
-	 *
-	 * @return State handle to the checkpoint metadata
-	 * @throws IOException Failures during store are forwarded
-	 */
-	static <T extends Savepoint> FileStateHandle storeSavepointToHandle(
-			String directory,
-			String filename,
-			T savepoint) throws IOException {
-
-		checkNotNull(directory, "Target directory");
-		checkNotNull(savepoint, "Savepoint");
-
-		final Path basePath = new Path(directory);
-		final Path metadataFilePath = new Path(basePath, filename);
-
-		final FileSystem fs = FileSystem.get(basePath.toUri());
-
-		boolean success = false;
-		try (FSDataOutputStream fdos = fs.create(metadataFilePath, WriteMode.NO_OVERWRITE);
-				DataOutputStream dos = new DataOutputStream(fdos))
-		{
-			// Write header
-			dos.writeInt(MAGIC_NUMBER);
-			dos.writeInt(savepoint.getVersion());
-
-			// Write savepoint
-			SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(savepoint);
-			serializer.serialize(savepoint, dos);
-
-			// construct result handle
-			FileStateHandle handle = new FileStateHandle(metadataFilePath, dos.size());
-
-			// all good!
-			success = true;
-			return handle;
-		}
-		finally {
-			if (!success && fs.exists(metadataFilePath)) {
-				if (!fs.delete(metadataFilePath, true)) {
-					LOG.warn("Failed to delete file {} after failed metadata write.", metadataFilePath);
-				}
-			}
-		}
-	}
-
-	/**
-	 * Loads the savepoint at the specified path.
-	 *
-	 * @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file.
-	 * @param classLoader The class loader used to resolve serialized classes from legacy savepoint formats.
-	 * @return The loaded savepoint
-	 * 
-	 * @throws IOException Failures during load are forwarded
-	 */
-	public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoader classLoader) throws IOException {
-		return loadSavepointWithHandle(savepointFileOrDirectory, classLoader).f0;
-	}
-
-	/**
-	 * Loads the savepoint at the specified path. This methods returns the savepoint, as well as the
-	 * handle to the metadata.
-	 *
-	 * @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file.
-	 * @param classLoader The class loader used to resolve serialized classes from legacy savepoint formats.
-	 * @return The loaded savepoint
-	 *
-	 * @throws IOException Failures during load are forwarded
-	 */
-	public static Tuple2<Savepoint, StreamStateHandle> loadSavepointWithHandle(
-			String savepointFileOrDirectory,
-			ClassLoader classLoader) throws IOException {
-		
-		checkNotNull(savepointFileOrDirectory, "savepointFileOrDirectory");
-		checkNotNull(classLoader, "classLoader");
-
-		Path path = new Path(savepointFileOrDirectory);
-
-		LOG.info("Loading savepoint from {}", path);
-
-		FileSystem fs = FileSystem.get(path.toUri());
-
-		FileStatus status = fs.getFileStatus(path);
-
-		// If this is a directory, we need to find the meta data file
-		if (status.isDir()) {
-			Path candidatePath = new Path(path, SAVEPOINT_METADATA_FILE);
-			if (fs.exists(candidatePath)) {
-				path = candidatePath;
-				LOG.info("Using savepoint file in {}", path);
-			} else {
-				throw new IOException("Cannot find meta data file in directory " + path
-						+ ". Please try to load the savepoint directly from the meta data file "
-						+ "instead of the directory.");
-			}
-		}
-
-		// load the savepoint
-		final Savepoint savepoint;
-		try (DataInputStream dis = new DataInputViewStreamWrapper(fs.open(path))) {
-			int magicNumber = dis.readInt();
-
-			if (magicNumber == MAGIC_NUMBER) {
-				int version = dis.readInt();
-
-				SavepointSerializer<?> serializer = SavepointSerializers.getSerializer(version);
-				savepoint = serializer.deserialize(dis, classLoader);
-			} else {
-				throw new RuntimeException("Unexpected magic number. This can have multiple reasons: " +
-						"(1) You are trying to load a Flink 1.0 savepoint, which is not supported by this " +
-						"version of Flink. (2) The file you were pointing to is not a savepoint at all. " +
-						"(3) The savepoint file has been corrupted.");
-			}
-		}
-
-		// construct the stream handle to the metadata file
-		// we get the size best-effort
-		long size = 0;
-		try {
-			size = fs.getFileStatus(path).getLen();
-		}
-		catch (Exception ignored) {
-			// we don't know the size, but we don't want to fail the savepoint loading for that
-		}
-		StreamStateHandle metadataHandle = new FileStateHandle(path, size);
-
-		return new Tuple2<>(savepoint, metadataHandle);
-	}
-
-	/**
-	 * Removes the savepoint meta data w/o loading and disposing it.
-	 *
-	 * @param path Path of savepoint to remove
-	 * @throws IOException Failures during disposal are forwarded
-	 */
-	public static void removeSavepointFile(String path) throws IOException {
-		Preconditions.checkNotNull(path, "Path");
-
-		try {
-			LOG.info("Removing savepoint: {}.", path);
-
-			Path filePath = new Path(path);
-			FileSystem fs = FileSystem.get(filePath.toUri());
-
-			if (fs.exists(filePath)) {
-				if (!fs.delete(filePath, true)) {
-					throw new IOException("Failed to delete " + filePath + ".");
-				}
-			} else {
-				throw new IllegalArgumentException("Invalid path '" + filePath.toUri() + "'.");
-			}
-		} catch (Throwable t) {
-			throw new IOException("Failed to dispose savepoint " + path + ".", t);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f88a6b62..5c2f0f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -58,7 +59,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
@@ -444,14 +444,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			long checkpointTimeout,
 			long minPauseBetweenCheckpoints,
 			int maxConcurrentCheckpoints,
-			ExternalizedCheckpointSettings externalizeSettings,
+			CheckpointRetentionPolicy retentionPolicy,
 			List<ExecutionJobVertex> verticesToTrigger,
 			List<ExecutionJobVertex> verticesToWaitFor,
 			List<ExecutionJobVertex> verticesToCommitTo,
 			List<MasterTriggerRestoreHook<?>> masterHooks,
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore checkpointStore,
-			String checkpointDir,
 			StateBackend checkpointStateBackend,
 			CheckpointStatsTracker statsTracker) {
 
@@ -475,13 +474,12 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			checkpointTimeout,
 			minPauseBetweenCheckpoints,
 			maxConcurrentCheckpoints,
-			externalizeSettings,
+			retentionPolicy,
 			tasksToTrigger,
 			tasksToWaitFor,
 			tasksToCommitTo,
 			checkpointIDCounter,
 			checkpointStore,
-			checkpointDir,
 			checkpointStateBackend,
 			ioExecutor,
 			SharedStateRegistry.DEFAULT_FACTORY);

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index c742903..fe70bb6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -186,7 +186,7 @@ public class ExecutionGraphBuilder {
 		// configure the state checkpointing
 		JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
 		if (snapshotSettings != null) {
-			List<ExecutionJobVertex> triggerVertices = 
+			List<ExecutionJobVertex> triggerVertices =
 					idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
 
 			List<ExecutionJobVertex> ackVertices =
@@ -242,7 +242,7 @@ public class ExecutionGraphBuilder {
 				try {
 					applicationConfiguredBackend = serializedAppConfigured.deserializeValue(classLoader);
 				} catch (IOException | ClassNotFoundException e) {
-					throw new JobExecutionException(jobId, 
+					throw new JobExecutionException(jobId,
 							"Could not deserialize application-defined state backend.", e);
 				}
 			}
@@ -295,14 +295,13 @@ public class ExecutionGraphBuilder {
 				chkConfig.getCheckpointTimeout(),
 				chkConfig.getMinPauseBetweenCheckpoints(),
 				chkConfig.getMaxConcurrentCheckpoints(),
-				chkConfig.getExternalizedCheckpointSettings(),
+				chkConfig.getCheckpointRetentionPolicy(),
 				triggerVertices,
 				ackVertices,
 				confirmVertices,
 				hooks,
 				checkpointIdCounter,
 				completedCheckpoints,
-				externalizedCheckpointsDir,
 				rootBackend,
 				checkpointStatsTracker);
 		}
@@ -331,7 +330,7 @@ public class ExecutionGraphBuilder {
 			} else {
 				throw new IllegalArgumentException(
 						"The snapshot checkpointing settings refer to non-existent vertex " + id);
-			} 
+			}
 		}
 
 		return result;
@@ -339,6 +338,6 @@ public class ExecutionGraphBuilder {
 
 	// ------------------------------------------------------------------------
 
-	/** This class is not supposed to be instantiated */
+	/** This class is not supposed to be instantiated. */
 	private ExecutionGraphBuilder() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
index e00a6d4..4ecbda5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
@@ -31,7 +32,7 @@ import java.util.Objects;
  */
 public class CheckpointCoordinatorConfiguration implements Serializable {
 
-	private static final long serialVersionUID = -647384516034982626L;
+	private static final long serialVersionUID = 2L;
 
 	private final long checkpointInterval;
 
@@ -41,8 +42,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
 
 	private final int maxConcurrentCheckpoints;
 
-	/** Settings for externalized checkpoints. */
-	private final ExternalizedCheckpointSettings externalizedCheckpointSettings;
+	/** Settings for what to do with checkpoints when a job finishes. */
+	private final CheckpointRetentionPolicy checkpointRetentionPolicy;
 
 	/**
 	 * Flag indicating whether exactly once checkpoint mode has been configured.
@@ -58,7 +59,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
 			long checkpointTimeout,
 			long minPauseBetweenCheckpoints,
 			int maxConcurrentCheckpoints,
-			ExternalizedCheckpointSettings externalizedCheckpointSettings,
+			CheckpointRetentionPolicy checkpointRetentionPolicy,
 			boolean isExactlyOnce) {
 
 		// sanity checks
@@ -71,7 +72,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
 		this.checkpointTimeout = checkpointTimeout;
 		this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
 		this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
-		this.externalizedCheckpointSettings = Preconditions.checkNotNull(externalizedCheckpointSettings);
+		this.checkpointRetentionPolicy = Preconditions.checkNotNull(checkpointRetentionPolicy);
 		this.isExactlyOnce = isExactlyOnce;
 	}
 
@@ -91,8 +92,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
 		return maxConcurrentCheckpoints;
 	}
 
-	public ExternalizedCheckpointSettings getExternalizedCheckpointSettings() {
-		return externalizedCheckpointSettings;
+	public CheckpointRetentionPolicy getCheckpointRetentionPolicy() {
+		return checkpointRetentionPolicy;
 	}
 
 	public boolean isExactlyOnce() {
@@ -113,12 +114,18 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
 			minPauseBetweenCheckpoints == that.minPauseBetweenCheckpoints &&
 			maxConcurrentCheckpoints == that.maxConcurrentCheckpoints &&
 			isExactlyOnce == that.isExactlyOnce &&
-			Objects.equals(externalizedCheckpointSettings, that.externalizedCheckpointSettings);
+			checkpointRetentionPolicy == that.checkpointRetentionPolicy;
 	}
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(checkpointInterval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, externalizedCheckpointSettings, isExactlyOnce);
+		return Objects.hash(
+				checkpointInterval,
+				checkpointTimeout,
+				minPauseBetweenCheckpoints,
+				maxConcurrentCheckpoints,
+				checkpointRetentionPolicy,
+				isExactlyOnce);
 	}
 
 	@Override
@@ -128,7 +135,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
 			", checkpointTimeout=" + checkpointTimeout +
 			", minPauseBetweenCheckpoints=" + minPauseBetweenCheckpoints +
 			", maxConcurrentCheckpoints=" + maxConcurrentCheckpoints +
-			", externalizedCheckpointSettings=" + externalizedCheckpointSettings +
+			", checkpointRetentionPolicy=" + checkpointRetentionPolicy +
 			'}';
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
deleted file mode 100644
index f432796..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph.tasks;
-
-import org.apache.flink.annotation.Internal;
-
-import java.util.Objects;
-
-/**
- * Grouped settings for externalized checkpoints.
- */
-@Internal
-public class ExternalizedCheckpointSettings implements java.io.Serializable {
-
-	private static final long serialVersionUID = -6271691851124392955L;
-
-	private static final ExternalizedCheckpointSettings NONE = new ExternalizedCheckpointSettings(false, false);
-
-	/** Flag indicating whether checkpoints should be externalized. */
-	private final boolean externalizeCheckpoints;
-
-	/** Flag indicating whether externalized checkpoints should delete on cancellation. */
-	private final boolean deleteOnCancellation;
-
-	private ExternalizedCheckpointSettings(boolean externalizeCheckpoints, boolean deleteOnCancellation) {
-		this.externalizeCheckpoints = externalizeCheckpoints;
-		this.deleteOnCancellation = deleteOnCancellation;
-	}
-
-	/**
-	 * Returns <code>true</code> if checkpoints should be externalized.
-	 *
-	 * @return <code>true</code> if checkpoints should be externalized.
-	 */
-	public boolean externalizeCheckpoints() {
-		return externalizeCheckpoints;
-	}
-
-	/**
-	 * Returns <code>true</code> if externalized checkpoints should be deleted on cancellation.
-	 *
-	 * @return <code>true</code> if externalized checkpoints should be deleted on cancellation.
-	 */
-	public boolean deleteOnCancellation() {
-		return deleteOnCancellation;
-	}
-
-	public static ExternalizedCheckpointSettings externalizeCheckpoints(boolean deleteOnCancellation) {
-		return new ExternalizedCheckpointSettings(true, deleteOnCancellation);
-	}
-
-	public static ExternalizedCheckpointSettings none() {
-		return NONE;
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-		ExternalizedCheckpointSettings that = (ExternalizedCheckpointSettings) o;
-		return externalizeCheckpoints == that.externalizeCheckpoints &&
-			deleteOnCancellation == that.deleteOnCancellation;
-	}
-
-	@Override
-	public int hashCode() {
-		return Objects.hash(externalizeCheckpoints, deleteOnCancellation);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
index 91c9ae1..b88183e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.rest.handler.job.checkpoints;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
@@ -71,11 +71,11 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<Check
 				"Checkpointing is not enabled for this job (" + executionGraph.getJobID() + ").",
 				HttpResponseStatus.NOT_FOUND);
 		} else {
-			ExternalizedCheckpointSettings externalizedCheckpointSettings = checkpointCoordinatorConfiguration.getExternalizedCheckpointSettings();
+			CheckpointRetentionPolicy retentionPolicy = checkpointCoordinatorConfiguration.getCheckpointRetentionPolicy();
 
 			CheckpointConfigInfo.ExternalizedCheckpointInfo externalizedCheckpointInfo = new CheckpointConfigInfo.ExternalizedCheckpointInfo(
-				externalizedCheckpointSettings.externalizeCheckpoints(),
-				externalizedCheckpointSettings.deleteOnCancellation());
+					retentionPolicy != CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+					retentionPolicy != CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION);
 
 			return new CheckpointConfigInfo(
 				checkpointCoordinatorConfiguration.isExactlyOnce() ? CheckpointConfigInfo.ProcessingMode.EXACTLY_ONCE : CheckpointConfigInfo.ProcessingMode.AT_LEAST_ONCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
index 69cc55f..fbaa484 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
 
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
@@ -100,12 +100,13 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
 			gen.writeNumberField(CheckpointConfigInfo.FIELD_NAME_CHECKPOINT_MIN_PAUSE, jobCheckpointingConfiguration.getMinPauseBetweenCheckpoints());
 			gen.writeNumberField(CheckpointConfigInfo.FIELD_NAME_CHECKPOINT_MAX_CONCURRENT, jobCheckpointingConfiguration.getMaxConcurrentCheckpoints());
 
-			ExternalizedCheckpointSettings externalization = jobCheckpointingConfiguration.getExternalizedCheckpointSettings();
+			CheckpointRetentionPolicy retentionPolicy = jobCheckpointingConfiguration.getCheckpointRetentionPolicy();
 			gen.writeObjectFieldStart(CheckpointConfigInfo.FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG);
 			{
-				if (externalization.externalizeCheckpoints()) {
+				if (retentionPolicy != CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION) {
 					gen.writeBooleanField(CheckpointConfigInfo.ExternalizedCheckpointInfo.FIELD_NAME_ENABLED, true);
-					gen.writeBooleanField(CheckpointConfigInfo.ExternalizedCheckpointInfo.FIELD_NAME_DELETE_ON_CANCELLATION, externalization.deleteOnCancellation());
+					gen.writeBooleanField(CheckpointConfigInfo.ExternalizedCheckpointInfo.FIELD_NAME_DELETE_ON_CANCELLATION,
+							retentionPolicy != CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION);
 				} else {
 					gen.writeBooleanField(CheckpointConfigInfo.ExternalizedCheckpointInfo.FIELD_NAME_ENABLED, false);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
new file mode 100644
index 0000000..ea1015a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+	/**
+	 * Checks whether this backend supports highly available storage of data.
+	 *
+	 * <p>Some state backends may offer support for that with default settings, which makes them
+	 * suitable for zero-config prototyping, but not for actual production setups.
+	 */
+	boolean supportsHighlyAvailableStorage();
+
+	/**
+	 * Checks whether the storage has a default savepoint location configured.
+	 */
+	boolean hasDefaultSavepointLocation();
+
+	/**
+	 * Resolves the given pointer to a checkpoint/savepoint into a state handle from which the
+	 * checkpoint metadata can be read. If the state backend cannot understand the format of
+	 * the pointer (for example because it was created by a different state backend) this method
+	 * should throw an {@code IOException}.
+	 *
+	 * @param pointer The pointer to resolve.
+	 * @return The state handler from which one can read the checkpoint metadata.
+	 *
+	 * @throws IOException Thrown, if the state backend does not understand the pointer, or if
+	 *                     the pointer could not be resolved due to an I/O error.
+	 */
+	StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
+
+	/**
+	 * Initializes a storage location for new checkpoint with the given ID.
+	 *
+	 * <p>The returned storage location can be used to write the checkpoint data and metadata
+	 * to and to obtain the pointers for the location(s) where the actual checkpoint data should be
+	 * stored.
+	 *
+	 * @param checkpointId The ID (logical timestamp) of the checkpoint that should be persisted.
+	 * @return A storage location for the data and metadata of the given checkpoint.
+	 *
+	 * @throws IOException Thrown if the storage location cannot be initialized due to an I/O exception.
+	 */
+	CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException;
+
+	/**
+	 * Initializes a storage location for new savepoint with the given ID.
+	 *
+	 * <p>If an external location pointer is passed, the savepoint storage location
+	 * will be initialized at the location of that pointer. If the external location pointer is null,
+	 * the default savepoint location will be used. If no default savepoint location is configured,
+	 * this will throw an exception. Whether a default savepoint location is configured can be
+	 * checked via {@link #hasDefaultSavepointLocation()}.
+	 *
+	 * @param checkpointId The ID (logical timestamp) of the savepoint's checkpoint.
+	 * @param externalLocationPointer Optionally, a pointer to the location where the savepoint should
+	 *                                be stored. May be null.
+	 *
+	 * @return A storage location for the data and metadata of the savepoint.
+	 *
+	 * @throws IOException Thrown if the storage location cannot be initialized due to an I/O exception.
+	 */
+	CheckpointStorageLocation initializeLocationForSavepoint(
+			long checkpointId,
+			@Nullable String externalLocationPointer) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java
new file mode 100644
index 0000000..fbc4805
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+
+import java.io.IOException;
+
+/**
+ * A storage location for one particular checkpoint. This location is typically
+ * created and initialized via {@link CheckpointStorage#initializeLocationForCheckpoint(long)} or
+ * {@link CheckpointStorage#initializeLocationForSavepoint(long, String)}.
+ */
+public interface CheckpointStorageLocation {
+
+	/**
+	 * Creates the output stream to persist the checkpoint metadata to.
+	 *
+	 * @return The output stream to persist the checkpoint metadata to.
+	 * @throws IOException Thrown, if the stream cannot be opened due to an I/O error.
+	 */
+	CheckpointStateOutputStream createMetadataOutputStream() throws IOException;
+
+	/**
+	 * Finalizes the checkpoint, marking the location as a finished checkpoint.
+	 * This method returns the external checkpoint pointer that can be used to resolve
+	 * the checkpoint upon recovery.
+	 *
+	 * @return The external pointer to the checkpoint at this location.
+	 * @throws IOException Thrown, if finalizing / marking as finished fails due to an I/O error.
+	 */
+	String markCheckpointAsFinished() throws IOException;
+
+	/**
+	 * Disposes the checkpoint location in case the checkpoint has failed.
+	 */
+	void disposeOnFailure() throws IOException;
+
+	/**
+	 * Gets the location encoded as a string pointer.
+	 *
+	 * <p>This pointer is used to send the target storage location via checkpoint RPC messages
+	 * and checkpoint barriers, in a format avoiding backend-specific classes.
+	 *
+	 * <p>That string encodes the location typically in a backend-specific way.
+	 * For example, file-based backends can encode paths here.
+	 */
+	String getLocationAsPointer();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
index 73113ec..c94b0e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -42,6 +43,14 @@ public interface CheckpointStreamFactory {
 	/**
 	 * A dedicated output stream that produces a {@link StreamStateHandle} when closed.
 	 *
+	 * <p><b>Important:</b> When closing this stream after the successful case, you must
+	 * call {@link #closeAndGetHandle()} - only that method will actually retain the resource
+	 * written to. The method has the semantics of "close on success".
+	 * The {@link #close()} method is supposed to remove the target resource if
+	 * called before {@link #closeAndGetHandle()}, hence having the semantics of
+	 * "close on failure". That way, simple try-with-resources statements automatically
+	 * clean up unsuccessful partial state resources in case the writing does not complete.
+	 *
 	 * <p>Note: This is an abstract class and not an interface because {@link OutputStream}
 	 * is an abstract class.
 	 */
@@ -51,9 +60,27 @@ public interface CheckpointStreamFactory {
 		 * Closes the stream and gets a state handle that can create an input stream
 		 * producing the data written to this stream.
 		 *
+		 * <p>This closing must be called (also when the caller is not interested in the handle)
+		 * to successfully close the stream and retain the produced resource. In contrast,
+		 * the {@link #close()} method removes the target resource when called.
+		 *
 		 * @return A state handle that can create an input stream producing the data written to this stream.
 		 * @throws IOException Thrown, if the stream cannot be closed.
 		 */
 		public abstract StreamStateHandle closeAndGetHandle() throws IOException;
+
+		/**
+		 * This method should close the stream, if has not been closed before.
+		 * If this method actually closes the stream, it should delete/release the
+		 * resource behind the stream, such as the file that the stream writes to.
+		 *
+		 * <p>The above implies that this method is intended to be the "unsuccessful close",
+		 * such as when cancelling the stream writing, or when an exception occurs.
+		 * Closing the stream for the successful case must go through {@link #closeAndGetHandle()}.
+		 *
+		 * @throws IOException Thrown, if the stream cannot be closed.
+		 */
+		@Override
+		public abstract void close() throws IOException;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index 7961b5e..3d3fda2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 
 /**
@@ -73,8 +74,7 @@ import java.io.IOException;
  * states stores that provide access to the persistent storage and hold the keyed- and operator
  * state data structures. That way, the State Backend can be very lightweight (contain only
  * configurations) which makes it easier to be serializable.
- * 
- * 
+ *
  * <h2>Thread Safety</h2>
  * 
  * State backend implementations have to be thread-safe. Multiple threads may be creating
@@ -84,13 +84,44 @@ import java.io.IOException;
 public interface StateBackend extends java.io.Serializable {
 
 	// ------------------------------------------------------------------------
-	//  Persistent Bytes Storage
+	//  Checkpoint storage - the durable persistence of checkpoint data
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Resolves the given pointer to a checkpoint/savepoint into a state handle from which the
+	 * checkpoint metadata can be read. If the state backend cannot understand the format of
+	 * the pointer (for example because it was created by a different state backend) this method
+	 * should throw an {@code IOException}.
+	 *
+	 * @param pointer The pointer to resolve.
+	 * @return The state handler from which one can read the checkpoint metadata.
+	 *
+	 * @throws IOException Thrown, if the state backend does not understand the pointer, or if
+	 *                     the pointer could not be resolved due to an I/O error.
+	 */
+	StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
+
+	/**
+	 * Creates a storage for checkpoints for the given job. The checkpoint storage is
+	 * used to write checkpoint data and metadata.
+	 *
+	 * @param jobId The job to store checkpoint data for.
+	 * @return A checkpoint storage for the given job.
+	 *
+	 * @throws IOException Thrown if the checkpoint storage cannot be initialized.
+	 */
+	CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException;
+
+	// ------------------------------------------------------------------------
+	//  Persistent bytes storage for checkpoint data
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Creates a {@link CheckpointStreamFactory} that can be used to create streams
 	 * that should end up in a checkpoint.
 	 *
+	 * <p>NOTE: This method will probably go into the {@link CheckpointStorage} in the future.
+	 *
 	 * @param jobId              The {@link JobID} of the job for which we are creating checkpoint streams.
 	 * @param operatorIdentifier An identifier of the operator for which we create streams.
 	 */
@@ -104,6 +135,8 @@ public interface StateBackend extends java.io.Serializable {
 	 * this will return the same factory as for regular checkpoints, but maybe
 	 * slightly adjusted.
 	 *
+	 * <p>NOTE: This method will probably go into the {@link CheckpointStorage} in the future.
+	 *
 	 * @param jobId The {@link JobID} of the job for which we are creating checkpoint streams.
 	 * @param operatorIdentifier An identifier of the operator for which we create streams.
 	 * @param targetLocation An optional custom location for the savepoint stream.
@@ -124,21 +157,13 @@ public interface StateBackend extends java.io.Serializable {
 	/**
 	 * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b>
 	 * and checkpointing it.
-	 * 
+	 *
 	 * <p><i>Keyed State</i> is state where each value is bound to a key.
-	 * 
-	 * @param env
-	 * @param jobID
-	 * @param operatorIdentifier
-	 * @param keySerializer
-	 * @param numberOfKeyGroups
-	 * @param keyGroupRange
-	 * @param kvStateRegistry
-	 * 
+	 *
 	 * @param <K> The type of the keys by which the state is organized.
-	 *     
+	 *
 	 * @return The Keyed State Backend for the given job, operator, and key group range.
-	 * 
+	 *
 	 * @throws Exception This method may forward all exceptions that occur while instantiating the backend.
 	 */
 	<K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
@@ -149,18 +174,18 @@ public interface StateBackend extends java.io.Serializable {
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
 			TaskKvStateRegistry kvStateRegistry) throws Exception;
-
+	
 	/**
 	 * Creates a new {@link OperatorStateBackend} that can be used for storing operator state.
-	 * 
+	 *
 	 * <p>Operator state is state that is associated with parallel operator (or function) instances,
 	 * rather than with keys.
-	 * 
+	 *
 	 * @param env The runtime environment of the executing task.
 	 * @param operatorIdentifier The identifier of the operator whose state should be stored.
-	 * 
+	 *
 	 * @return The OperatorStateBackend for operator identified by the job and operator identifier.
-	 * 
+	 *
 	 * @throws Exception This method may forward all exceptions that occur while instantiating the backend.
 	 */
 	OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
index 6ec6f24..0b622c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
@@ -25,11 +25,11 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.runtime.state.StreamStateHandle;
 
 import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.net.URI;
 
 /**
@@ -45,8 +45,8 @@ import java.net.URI;
  *
  * <h1>Checkpoint Layout</h1>
  *
- * The state backend is configured with a base directory and persists the checkpoint data of specific
- * checkpoints in specific subdirectories. For example, if the base directory was set to 
+ * <p>The state backend is configured with a base directory and persists the checkpoint data of specific
+ * checkpoints in specific subdirectories. For example, if the base directory was set to
  * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will create a subdirectory with
  * the job's ID that will contain the actual checkpoints:
  * ({@code hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b})
@@ -56,17 +56,20 @@ import java.net.URI;
  *
  * <h1>Savepoint Layout</h1>
  *
- * A savepoint that is set to be stored in path {@code hdfs://namenode:port/flink-savepoints/}, will create
+ * <p>A savepoint that is set to be stored in path {@code hdfs://namenode:port/flink-savepoints/}, will create
  * a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in which it stores all savepoint data.
  * The random digits are added as "entropy" to avoid directory collisions.
+ *
+ * <h1>Metadata File</h1>
+ *
+ * <p>A completed checkpoint writes its metadata into a file
+ * '{@value AbstractFsCheckpointStorage#METADATA_FILE_NAME}'.
  */
 @PublicEvolving
 public abstract class AbstractFileStateBackend extends AbstractStateBackend {
 
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStateBackend.class);
-
 	// ------------------------------------------------------------------------
 	//  State Backend Properties
 	// ------------------------------------------------------------------------
@@ -118,7 +121,7 @@ public abstract class AbstractFileStateBackend extends AbstractStateBackend {
 	 *
 	 * @param baseCheckpointPath The checkpoint base directory to use (or null).
 	 * @param baseSavepointPath The default savepoint directory to use (or null).
-	 * @param configuration The configuration to read values from 
+	 * @param configuration The configuration to read values from.
 	 */
 	protected AbstractFileStateBackend(
 			@Nullable Path baseCheckpointPath,
@@ -154,10 +157,18 @@ public abstract class AbstractFileStateBackend extends AbstractStateBackend {
 	}
 
 	// ------------------------------------------------------------------------
+	//  Initialization and metadata storage
+	// ------------------------------------------------------------------------
+
+	@Override
+	public StreamStateHandle resolveCheckpoint(String pointer) throws IOException {
+		return AbstractFsCheckpointStorage.resolveCheckpointPointer(pointer);
+	}
+
+	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	// 
 	/**
 	 * Checks the validity of the path's scheme and path.
 	 *
@@ -203,4 +214,4 @@ public abstract class AbstractFileStateBackend extends AbstractStateBackend {
 			}
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
new file mode 100644
index 0000000..1254ee2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.FileUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of durable checkpoint storage to file systems.
+ */
+public abstract class AbstractFsCheckpointStorage implements CheckpointStorage {
+
+	// ------------------------------------------------------------------------
+	//  Constants
+	// ------------------------------------------------------------------------
+
+	/** The prefix of the directory containing the data exclusive to a checkpoint. */
+	public static final String CHECKPOINT_DIR_PREFIX = "chk-";
+
+	/** The name of the directory for shared checkpoint state. */
+	public static final String CHECKPOINT_SHARED_STATE_DIR = "shared";
+
+	/** The name of the directory for state not owned/released by the master, but by the TaskManagers. */
+	public static final String CHECKPOINT_TASK_OWNED_STATE_DIR = "taskowned";
+
+	/** The name of the metadata files in checkpoints / savepoints. */
+	public static final String METADATA_FILE_NAME = "_metadata";
+
+	// ------------------------------------------------------------------------
+	//  Fields and properties
+	// ------------------------------------------------------------------------
+
+	/** The jobId, written into the generated savepoint directories. */
+	private final JobID jobId;
+
+	/** The default location for savepoints. Null, if none is configured. */
+	@Nullable
+	private final Path defaultSavepointDirectory;
+
+	/**
+	 * Creates a new checkpoint storage.
+	 *
+	 * @param jobId The ID of the job that writes the checkpoints.
+	 * @param defaultSavepointDirectory The default location for savepoints, or null, if none is set.
+	 */
+	protected AbstractFsCheckpointStorage(
+			JobID jobId,
+			@Nullable Path defaultSavepointDirectory) {
+
+		this.jobId = checkNotNull(jobId);
+		this.defaultSavepointDirectory = defaultSavepointDirectory;
+	}
+
+	/**
+	 * Gets the default directory for savepoints. Returns null, if no default savepoint
+	 * directory is configured.
+	 */
+	@Nullable
+	public Path getDefaultSavepointDirectory() {
+		return defaultSavepointDirectory;
+	}
+
+	// ------------------------------------------------------------------------
+	//  CheckpointStorage implementation
+	// ------------------------------------------------------------------------
+
+	@Override
+	public boolean hasDefaultSavepointLocation() {
+		return defaultSavepointDirectory != null;
+	}
+
+	@Override
+	public StreamStateHandle resolveCheckpoint(String checkpointPointer) throws IOException {
+		return resolveCheckpointPointer(checkpointPointer);
+	}
+
+	/**
+	 * Creates a file system based storage location for a savepoint.
+	 *
+	 * <p>This methods implements the logic that decides which location to use (given optional
+	 * parameters for a configured location and a location passed for this specific savepoint)
+	 * and how to name and initialize the savepoint directory.
+	 *
+	 * @param externalLocationPointer    The target location pointer for the savepoint.
+	 *                                   Must be a valid URI. Null, if not supplied.
+	 * @param checkpointId               The checkpoint ID of the savepoint.
+	 *
+	 * @return The checkpoint storage location for the savepoint.
+	 *
+	 * @throws IOException Thrown if the target directory could not be created.
+	 */
+	@Override
+	public FsCheckpointStorageLocation initializeLocationForSavepoint(
+			@SuppressWarnings("unused") long checkpointId,
+			@Nullable String externalLocationPointer) throws IOException {
+
+		// determine where to write the savepoint to
+
+		final Path savepointBasePath;
+		if (externalLocationPointer != null) {
+			savepointBasePath = new Path(externalLocationPointer);
+		}
+		else if (defaultSavepointDirectory != null) {
+			savepointBasePath = defaultSavepointDirectory;
+		}
+		else {
+			throw new IllegalArgumentException("No savepoint location given and no default location configured.");
+		}
+
+		// generate the savepoint directory
+
+		final FileSystem fs = savepointBasePath.getFileSystem();
+		final String prefix = "savepoint-" + jobId.toString().substring(0, 6) + '-';
+
+		Exception latestException = null;
+		for (int attempt = 0; attempt < 10; attempt++) {
+			final Path path = new Path(savepointBasePath, FileUtils.getRandomFilename(prefix));
+
+			try {
+				if (fs.mkdirs(path)) {
+					return new FsCheckpointStorageLocation(fs, path, path, path);
+				}
+			} catch (Exception e) {
+				latestException = e;
+			}
+		}
+
+		throw new IOException("Failed to create savepoint directory at " + savepointBasePath, latestException);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Creating and resolving paths
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Builds directory into which a specific job checkpoints, meaning the directory inside which
+	 * it creates the checkpoint-specific subdirectories.
+	 *
+	 * <p>This method only succeeds if a base checkpoint directory has been set; otherwise
+	 * the method fails with an exception.
+	 *
+	 * @param jobId The ID of the job
+	 * @return The job's checkpoint directory, re
+	 *
+	 * @throws UnsupportedOperationException Thrown, if no base checkpoint directory has been set.
+	 */
+	protected static Path getCheckpointDirectoryForJob(Path baseCheckpointPath, JobID jobId) {
+		return new Path(baseCheckpointPath, jobId.toString());
+	}
+
+	/**
+	 * Creates the directory path for the data exclusive to a specific checkpoint.
+	 *
+	 * @param baseDirectory The base directory into which the job checkpoints.
+	 * @param checkpointId The ID (logical timestamp) of the checkpoint.
+	 */
+	protected static Path createCheckpointDirectory(Path baseDirectory, long checkpointId) {
+		return new Path(baseDirectory, CHECKPOINT_DIR_PREFIX + checkpointId);
+	}
+
+	/**
+	 * Takes the given string (representing a pointer to a checkpoint) and resolves it to a file
+	 * status for the checkpoint's metadata file.
+	 *
+	 * @param checkpointPointer The pointer to resolve.
+	 * @return A state handle to checkpoint/savepoint's metadata.
+	 *
+	 * @throws IOException Thrown, if the pointer cannot be resolved, the file system not accessed, or
+	 *                     the pointer points to a location that does not seem to be a checkpoint/savepoint.
+	 */
+	protected static StreamStateHandle resolveCheckpointPointer(String checkpointPointer) throws IOException {
+		checkNotNull(checkpointPointer, "checkpointPointer");
+		checkArgument(!checkpointPointer.isEmpty(), "empty checkpoint pointer");
+
+		// check if the pointer is in fact a valid file path
+		final Path path;
+		try {
+			path = new Path(checkpointPointer);
+		}
+		catch (Exception e) {
+			throw new IOException("Checkpoint/savepoint path '" + checkpointPointer + "' is not a valid file URI. " +
+					"Either the pointer path is invalid, or the checkpoint was created by a different state backend.");
+		}
+
+		// check if the file system can be accessed
+		final FileSystem fs;
+		try {
+			fs = path.getFileSystem();
+		}
+		catch (IOException e) {
+			throw new IOException("Cannot access file system for checkpoint/savepoint path '" +
+					checkpointPointer + "'.", e);
+		}
+
+		final FileStatus status;
+		try {
+			status = fs.getFileStatus(path);
+		}
+		catch (FileNotFoundException e) {
+			throw new FileNotFoundException("Cannot find checkpoint or savepoint " +
+					"file/directory '" + checkpointPointer + "' on file system '" + fs.getUri().getScheme() + "'.");
+		}
+
+		// if we are here, the file / directory exists
+		final FileStatus metadataFileStatus;
+
+		// If this is a directory, we need to find the meta data file
+		if (status.isDir()) {
+			final Path metadataFilePath = new Path(path, METADATA_FILE_NAME);
+			try {
+				metadataFileStatus = fs.getFileStatus(metadataFilePath);
+			}
+			catch (FileNotFoundException e) {
+				throw new FileNotFoundException("Cannot find meta data file '" + METADATA_FILE_NAME +
+						"' in directory '" + path + "'. Please try to load the checkpoint/savepoint " +
+						"directly from the metadata file instead of the directory.");
+			}
+		}
+		else {
+			// this points to a file and we either do no name validation, or
+			// the name is actually correct, so we can return the path
+			metadataFileStatus = status;
+		}
+
+		return new FileStateHandle(metadataFileStatus.getPath(), metadataFileStatus.getLen());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FixFileFsStateOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FixFileFsStateOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FixFileFsStateOutputStream.java
new file mode 100644
index 0000000..bc1ca01
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FixFileFsStateOutputStream.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link CheckpointStateOutputStream} that writes into a specified file and
+ * returns a {@link FileStateHandle} upon closing.
+ *
+ * <p>Unlike the {@link org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream},
+ * this stream does not have a threshold below which it returns a memory byte stream handle,
+ * and does not create random files, but writes to a specified file. 
+ */
+public final class FixFileFsStateOutputStream extends CheckpointStateOutputStream {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FixFileFsStateOutputStream.class);
+
+	// ------------------------------------------------------------------------
+
+	private final FSDataOutputStream out;
+
+	private final Path path;
+
+	private final FileSystem fileSystem;
+
+	private volatile boolean closed;
+
+
+	public FixFileFsStateOutputStream(FileSystem fileSystem, Path path) throws IOException {
+		this.fileSystem = checkNotNull(fileSystem);
+		this.path = checkNotNull(path);
+
+		this.out = fileSystem.create(path, WriteMode.NO_OVERWRITE);
+	}
+
+	// ------------------------------------------------------------------------
+	//  I/O
+	// ------------------------------------------------------------------------
+
+	@Override
+	public final void write(int b) throws IOException {
+		out.write(b);
+	}
+
+	@Override
+	public final void write(@Nonnull byte[] b, int off, int len) throws IOException {
+		out.write(b, off, len);
+	}
+
+	@Override
+	public long getPos() throws IOException {
+		return out.getPos();
+	}
+
+	@Override
+	public void flush() throws IOException {
+		out.flush();
+	}
+
+	@Override
+	public void sync() throws IOException {
+		out.sync();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Closing
+	// ------------------------------------------------------------------------
+
+	public boolean isClosed() {
+		return closed;
+	}
+
+	@Override
+	public void close() {
+		if (!closed) {
+			closed = true;
+
+			try {
+				out.close();
+				fileSystem.delete(path, false);
+			}
+			catch (Throwable t) {
+				LOG.warn("Could not close the state stream for {}.", path, t);
+			}
+		}
+	}
+
+	@Override
+	public FileStateHandle closeAndGetHandle() throws IOException {
+		synchronized (this) {
+			if (!closed) {
+				try {
+					// make a best effort attempt to figure out the size
+					long size = 0;
+					try {
+						size = out.getPos();
+					} catch (Exception ignored) {}
+
+					// close and return
+					out.close();
+
+					return new FileStateHandle(path, size);
+				}
+				catch (Exception e) {
+					try {
+						fileSystem.delete(path, false);
+					}
+					catch (Exception deleteException) {
+						LOG.warn("Could not delete the checkpoint stream file {}.", path, deleteException);
+					}
+
+					throw new IOException("Could not flush and close the file system " +
+							"output stream to " + path + " in order to obtain the " +
+							"stream state handle", e);
+				}
+				finally {
+					closed = true;
+				}
+			}
+			else {
+				throw new IOException("Stream has already been closed and discarded.");
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
new file mode 100644
index 0000000..b7be8fa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * An implementation of durable checkpoint storage to file systems.
+ */
+public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
+
+	private final FileSystem fileSystem;
+
+	private final Path checkpointsDirectory;
+
+	private final Path sharedStateDirectory;
+
+	private final Path taskOwnedStateDirectory;
+
+	public FsCheckpointStorage(
+			Path checkpointBaseDirectory,
+			@Nullable Path defaultSavepointDirectory,
+			JobID jobId) throws IOException {
+
+		super(jobId, defaultSavepointDirectory);
+
+		this.fileSystem = checkpointBaseDirectory.getFileSystem();
+		this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointBaseDirectory, jobId);
+		this.sharedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_SHARED_STATE_DIR);
+		this.taskOwnedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_TASK_OWNED_STATE_DIR);
+
+		// initialize the dedicated directories
+		fileSystem.mkdirs(checkpointsDirectory);
+		fileSystem.mkdirs(sharedStateDirectory);
+		fileSystem.mkdirs(taskOwnedStateDirectory);
+	}
+
+	// ------------------------------------------------------------------------
+	//  CheckpointStorage implementation
+	// ------------------------------------------------------------------------
+
+	@Override
+	public boolean supportsHighlyAvailableStorage() {
+		return true;
+	}
+
+	@Override
+	public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
+		checkArgument(checkpointId >= 0);
+
+		// prepare all the paths needed for the checkpoints
+		final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId);
+
+		// create the checkpoint exclusive directory
+		fileSystem.mkdirs(checkpointDir);
+
+		return new FsCheckpointStorageLocation(
+						fileSystem, checkpointDir, sharedStateDirectory, taskOwnedStateDirectory);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
new file mode 100644
index 0000000..829ab9a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A storage location for checkpoints on a file system.
+ */
+public class FsCheckpointStorageLocation implements CheckpointStorageLocation {
+
+	private final FileSystem fileSystem;
+
+	private final Path checkpointDirectory;
+
+	private final Path sharedStateDirectory;
+
+	private final Path taskOwnedStateDirectory;
+
+	private final Path metadataFilePath;
+
+	private final String qualifiedCheckpointDirectory;
+
+	public FsCheckpointStorageLocation(
+			FileSystem fileSystem,
+			Path checkpointDir,
+			Path sharedStateDir,
+			Path taskOwnedStateDir) {
+
+		this.fileSystem = checkNotNull(fileSystem);
+		this.checkpointDirectory = checkNotNull(checkpointDir);
+		this.sharedStateDirectory = checkNotNull(sharedStateDir);
+		this.taskOwnedStateDirectory = checkNotNull(taskOwnedStateDir);
+
+		this.metadataFilePath = new Path(checkpointDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME);
+
+		this.qualifiedCheckpointDirectory = checkpointDir.makeQualified(fileSystem).toString();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Properties
+	// ------------------------------------------------------------------------
+
+	public Path getCheckpointDirectory() {
+		return checkpointDirectory;
+	}
+
+	public Path getSharedStateDirectory() {
+		return sharedStateDirectory;
+	}
+
+	public Path getTaskOwnedStateDirectory() {
+		return taskOwnedStateDirectory;
+	}
+
+	public Path getMetadataFilePath() {
+		return metadataFilePath;
+	}
+
+	// ------------------------------------------------------------------------
+	//  checkpoint metadata
+	// ------------------------------------------------------------------------
+
+	@Override
+	public CheckpointStateOutputStream createMetadataOutputStream() throws IOException {
+		return new FixFileFsStateOutputStream(fileSystem, metadataFilePath);
+	}
+
+	@Override
+	public String markCheckpointAsFinished() throws IOException {
+		return qualifiedCheckpointDirectory;
+	}
+
+	@Override
+	public void disposeOnFailure() throws IOException {
+		// on a failure, no chunk in the checkpoint directory needs to be saved, so
+		// we can drop it as a whole
+		fileSystem.delete(checkpointDirectory, true);
+	}
+
+	@Override
+	public String getLocationAsPointer() {
+		return qualifiedCheckpointDirectory;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return "FsCheckpointStorageLocation {" +
+				"metadataFilePath=" + metadataFilePath +
+				", taskOwnedStateDirectory=" + taskOwnedStateDirectory +
+				", sharedStateDirectory=" + sharedStateDirectory +
+				", checkpointDirectory=" + checkpointDirectory +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 2fff45a..de49552 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
@@ -434,6 +435,12 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur
 	// ------------------------------------------------------------------------
 
 	@Override
+	public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
+		checkNotNull(jobId, "jobId");
+		return new FsCheckpointStorage(getCheckpointPath(), getSavepointPath(), jobId);
+	}
+
+	@Override
 	public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
 		return new FsCheckpointStreamFactory(getCheckpointPath(), jobId, getMinFileSizeThreshold());
 	}
@@ -447,6 +454,10 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur
 		return new FsSavepointStreamFactory(new Path(targetLocation), jobId, getMinFileSizeThreshold());
 	}
 
+	// ------------------------------------------------------------------------
+	//  state holding structures
+	// ------------------------------------------------------------------------
+
 	@Override
 	public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 			Environment env,

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java
new file mode 100644
index 0000000..3fb2627
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of a checkpoint storage for the {@link MemoryStateBackend}.
+ * Depending on whether this is created with a checkpoint location, the setup supports
+ * durable checkpoints (durable metadata) or not.
+ */
+public class MemoryBackendCheckpointStorage extends AbstractFsCheckpointStorage {
+
+	/** The target directory for checkpoints (here metadata files only). Null, if not configured. */
+	@Nullable
+	private final Path checkpointsDirectory;
+
+	/** The file system to persist the checkpoints to. Null if this does not durably persist checkpoints. */
+	@Nullable
+	private final FileSystem fileSystem;
+
+	/**
+	 * Creates a new MemoryBackendCheckpointStorage. The storage neither persists checkpoints
+	 * in the filesystem, nor does it have a default savepoint location. The storage does support
+	 * savepoints, though, when an explicit savepoint location is passed to
+	 * {@link #initializeLocationForSavepoint(long, String)}.
+	 *
+	 * @param jobId The ID of the job writing the checkpoints.
+	 */
+	public MemoryBackendCheckpointStorage(JobID jobId) {
+		super(jobId, null);
+		checkpointsDirectory = null;
+		fileSystem = null;
+	}
+
+	/**
+	 * Creates a new MemoryBackendCheckpointStorage.
+	 *
+	 * @param jobId The ID of the job writing the checkpoints.
+	 * @param checkpointsBaseDirectory The directory to write checkpoints to. May be null,
+	 *                                 in which case this storage does not support durable persistence.
+	 * @param defaultSavepointLocation The default savepoint directory, or null, if none is set.
+	 *
+	 * @throws IOException Thrown if a checkpoint base directory is given configured and the
+	 *                     checkpoint directory cannot be created within that directory.
+	 */
+	public MemoryBackendCheckpointStorage(
+			JobID jobId,
+			@Nullable Path checkpointsBaseDirectory,
+			@Nullable Path defaultSavepointLocation) throws IOException {
+
+		super(jobId, defaultSavepointLocation);
+
+		if (checkpointsBaseDirectory == null) {
+			checkpointsDirectory = null;
+			fileSystem = null;
+		}
+		else {
+			this.fileSystem = checkpointsBaseDirectory.getFileSystem();
+			this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointsBaseDirectory, jobId);
+
+			fileSystem.mkdirs(checkpointsDirectory);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Checkpoint Storage
+	// ------------------------------------------------------------------------
+
+	@Override
+	public boolean supportsHighlyAvailableStorage() {
+		return checkpointsDirectory != null;
+	}
+
+	@Override
+	public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
+		checkArgument(checkpointId >= 0);
+
+		if (checkpointsDirectory != null) {
+			// configured for durable metadata
+			// prepare all the paths needed for the checkpoints
+			checkState(fileSystem != null);
+
+			final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId);
+
+			// create the checkpoint exclusive directory
+			fileSystem.mkdirs(checkpointDir);
+
+			return new PersistentMetadataCheckpointStorageLocation(fileSystem, checkpointDir);
+		}
+		else {
+			// no durable metadata - typical in IDE or test setup case
+			return new NonPersistentMetadataCheckpointStorageLocation();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return getClass().getName() + " - " +
+				(checkpointsDirectory == null ? "(not persistent)" : checkpointsDirectory) +
+				", default savepoint dir: " +
+				(getDefaultSavepointDirectory() == null ? "(none)" : getDefaultSavepointDirectory());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 2079a97..afcf9a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
@@ -278,14 +279,8 @@ public class MemoryStateBackend extends AbstractFileStateBackend implements Conf
 	// ------------------------------------------------------------------------
 
 	@Override
-	public OperatorStateBackend createOperatorStateBackend(
-		Environment env,
-		String operatorIdentifier) throws Exception {
-
-		return new DefaultOperatorStateBackend(
-			env.getUserClassLoader(),
-			env.getExecutionConfig(),
-			isUsingAsynchronousSnapshots());
+	public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
+		return new MemoryBackendCheckpointStorage(jobId, getCheckpointPath(), getSavepointPath());
 	}
 
 	@Override
@@ -304,10 +299,21 @@ public class MemoryStateBackend extends AbstractFileStateBackend implements Conf
 	}
 
 	// ------------------------------------------------------------------------
-	//  checkpoint state persistence
+	//  state holding structures
 	// ------------------------------------------------------------------------
 
 	@Override
+	public OperatorStateBackend createOperatorStateBackend(
+			Environment env,
+			String operatorIdentifier) throws Exception {
+
+		return new DefaultOperatorStateBackend(
+				env.getUserClassLoader(),
+				env.getExecutionConfig(),
+				isUsingAsynchronousSnapshots());
+	}
+
+	@Override
 	public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 			Environment env, JobID jobID,
 			String operatorIdentifier,

http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java
new file mode 100644
index 0000000..3baa319
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.MemoryCheckpointOutputStream;
+
+import java.io.IOException;
+
+/**
+ * A checkpoint storage location for the {@link MemoryStateBackend} in case no durable persistence
+ * for metadata has been configured.
+ */
+public class NonPersistentMetadataCheckpointStorageLocation implements CheckpointStorageLocation {
+
+	/** The external pointer returned for checkpoints that are not externally addressable. */
+	public static final String EXTERNAL_POINTER = "<checkpoint-not-externally-addressable>";
+
+	/** The maximum serialized state size for the checkpoint metadata. */
+	private static final int MAX_METADATA_STATE_SIZE = Integer.MAX_VALUE;
+
+	@Override
+	public CheckpointStateOutputStream createMetadataOutputStream() throws IOException {
+		return new MemoryCheckpointOutputStream(MAX_METADATA_STATE_SIZE);
+	}
+
+	@Override
+	public String markCheckpointAsFinished() {
+		return EXTERNAL_POINTER;
+	}
+
+	@Override
+	public void disposeOnFailure() {}
+
+	@Override
+	public String getLocationAsPointer() {
+		return PersistentMetadataCheckpointStorageLocation.LOCATION_POINTER;
+	}
+}