You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/28 18:36:49 UTC

[06/11] flink git commit: [FLINK-5897] [checkpoints] Make checkpoint externalization not depend strictly on FileSystems

[FLINK-5897] [checkpoints] Make checkpoint externalization not depend strictly on FileSystems

That is the first step towards checkpoints that can be externalized to other stores as well,
like k/v stores and databases, if supported by the state backend.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b7f21d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b7f21d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b7f21d8

Branch: refs/heads/master
Commit: 5b7f21d891b410ca0046efdaf12caf5e73deadf4
Parents: 9912de2
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 22 22:18:50 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 28 18:59:10 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/util/ExceptionUtils.java   |   4 +-
 .../checkpoint/CheckpointCoordinator.java       |  20 ++-
 .../runtime/checkpoint/CompletedCheckpoint.java | 135 +++++++++++++++----
 .../checkpoint/CompletedCheckpointStore.java    |   9 ++
 .../runtime/checkpoint/PendingCheckpoint.java   | 112 ++++++++++-----
 .../StandaloneCompletedCheckpointStore.java     |   4 +
 .../ZooKeeperCompletedCheckpointStore.java      |   5 +
 .../checkpoint/savepoint/SavepointLoader.java   |  19 ++-
 .../checkpoint/savepoint/SavepointStore.java    |  93 ++++++++++---
 .../apache/flink/runtime/state/StateUtil.java   |  17 +--
 .../flink/runtime/jobmanager/JobManager.scala   |  14 +-
 .../CheckpointCoordinatorFailureTest.java       |   5 +
 .../CompletedCheckpointStoreTest.java           |   2 +-
 .../checkpoint/CompletedCheckpointTest.java     |  17 ++-
 .../checkpoint/PendingCheckpointTest.java       |  25 ++--
 .../jobmanager/JobManagerHARecoveryTest.java    |   4 +
 .../runtime/jobmanager/JobManagerITCase.scala   |   2 +-
 .../JobManagerHACheckpointRecoveryITCase.java   |   2 +-
 18 files changed, 365 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index fea25ff..7167a0b 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -257,7 +257,7 @@ public final class ExceptionUtils {
 			throw (Error) t;
 		}
 		else {
-			throw new IOException(t);
+			throw new IOException(t.getMessage(), t);
 		}
 	}
 
@@ -268,7 +268,7 @@ public final class ExceptionUtils {
 	 * @param searchType the type of exception to search for in the chain.
 	 * @return True, if the searched type is nested in the throwable, false otherwise.
 	 */
-	public static boolean containsThrowable(Throwable throwable, Class searchType) {
+	public static boolean containsThrowable(Throwable throwable, Class<?> searchType) {
 		if (throwable == null || searchType == null) {
 			return false;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index c1c65b5..6da6f7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.TaskStateHandles;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -758,13 +759,19 @@ public class CheckpointCoordinator {
 		CompletedCheckpoint completedCheckpoint = null;
 
 		try {
-			completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
+			// externalize the checkpoint if required
+			if (pendingCheckpoint.getProps().externalizeCheckpoint()) {
+				completedCheckpoint = pendingCheckpoint.finalizeCheckpointExternalized();
+			} else {
+				completedCheckpoint = pendingCheckpoint.finalizeCheckpointNonExternalized();
+			}
 
 			completedCheckpointStore.addCheckpoint(completedCheckpoint);
 
 			rememberRecentCheckpointId(checkpointId);
 			dropSubsumedCheckpoints(checkpointId);
-		} catch (Exception exception) {
+		}
+		catch (Exception exception) {
 			// abort the current pending checkpoint if it has not been discarded yet
 			if (!pendingCheckpoint.isDiscarded()) {
 				pendingCheckpoint.abortError(exception);
@@ -779,8 +786,8 @@ public class CheckpointCoordinator {
 					public void run() {
 						try {
 							cc.discard();
-						} catch (Exception nestedException) {
-							LOG.warn("Could not properly discard completed checkpoint {}.", cc.getCheckpointID(), nestedException);
+						} catch (Throwable t) {
+							LOG.warn("Could not properly discard completed checkpoint {}.", cc.getCheckpointID(), t);
 						}
 					}
 				});
@@ -808,11 +815,12 @@ public class CheckpointCoordinator {
 				builder.append(", ");
 			}
 			// Remove last two chars ", "
-			builder.delete(builder.length() - 2, builder.length());
+			builder.setLength(builder.length() - 2);
 
 			LOG.debug(builder.toString());
 		}
 
+		// send the "notify complete" call to all vertices
 		final long timestamp = completedCheckpoint.getTimestamp();
 
 		for (ExecutionVertex ev : tasksToCommitTo) {
@@ -934,7 +942,7 @@ public class CheckpointCoordinator {
 					latest.getCheckpointID(),
 					latest.getProperties(),
 					restoreTimestamp,
-					latest.getExternalPath());
+					latest.getExternalPointer());
 
 				statsTracker.reportRestoredCheckpoint(restored);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index db86484..17ce4d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats.DiscardCallback;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,8 +38,36 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state)
- * and that is considered completed.
+ * A CompletedCheckpoint describes a checkpoint after all required tasks acknowledged it (with their state)
+ * and that is considered successful. The CompletedCheckpoint class contains all the metadata of the
+ * checkpoint, i.e., checkpoint ID, timestamps, and the handles to all states that are part of the
+ * checkpoint.
+ * 
+ * <h2>Size the CompletedCheckpoint Instances</h2>
+ * 
+ * In most cases, the CompletedCheckpoint objects are very small, because the handles to the checkpoint
+ * states are only pointers (such as file paths). However, the some state backend implementations may
+ * choose to store some payload data directly with the metadata (for example to avoid many small files).
+ * If those thresholds are increased to large values, the memory consumption of the CompletedCheckpoint
+ * objects can be significant.
+ * 
+ * <h2>Externalized Metadata</h2>
+ * 
+ * The metadata of the CompletedCheckpoint is optionally also persisted in an external storage
+ * system. In that case, the checkpoint is called <i>externalized</i>.
+ * 
+ * <p>Externalized checkpoints have an external pointer, which points to the metadata. For example
+ * when externalizing to a file system, that pointer is the file path to the checkpoint's folder
+ * or the metadata file. For a state backend that stores metadata in database tables, the pointer
+ * could be the table name and row key. The pointer is encoded as a String.
+ * 
+ * <h2>Externalized Metadata and High-availability</h2>
+ * 
+ * For high availability setups, the checkpoint metadata must be stored persistent and available
+ * as well. The high-availability services that stores the checkpoint ground-truth (meaning what are
+ * the latest completed checkpoints in what order) often rely on checkpoints being externalized. That
+ * way, those services only store pointers to the externalized metadata, rather than the complete
+ * metadata itself (for example ZooKeeper's ZNode payload should ideally be less than megabytes).
  */
 public class CompletedCheckpoint implements Serializable {
 
@@ -44,8 +75,12 @@ public class CompletedCheckpoint implements Serializable {
 
 	private static final long serialVersionUID = -8360248179615702014L;
 
+	// ------------------------------------------------------------------------
+
+	/** The ID of the job that the checkpoint belongs to */
 	private final JobID job;
 
+	/** The ID (logical timestamp) of the checkpoint */
 	private final long checkpointID;
 
 	/** The timestamp when the checkpoint was triggered. */
@@ -60,23 +95,41 @@ public class CompletedCheckpoint implements Serializable {
 	/** Properties for this checkpoint. */
 	private final CheckpointProperties props;
 
-	/** External path if persisted checkpoint; <code>null</code> otherwise. */
-	private final String externalPath;
+	/** The state handle to the externalized meta data, if the metadata has been externalized */
+	@Nullable
+	private final StreamStateHandle externalizedMetadata;
+
+	/** External pointer to the completed checkpoint (for example file path) if externalized; null otherwise. */
+	@Nullable
+	private final String externalPointer;
 
 	/** Optional stats tracker callback for discard. */
 	@Nullable
-	private transient CompletedCheckpointStats.DiscardCallback discardCallback;
+	private transient volatile DiscardCallback discardCallback;
 
 	// ------------------------------------------------------------------------
 
-	public CompletedCheckpoint(
+	@VisibleForTesting
+	CompletedCheckpoint(
 			JobID job,
 			long checkpointID,
 			long timestamp,
 			long completionTimestamp,
 			Map<JobVertexID, TaskState> taskStates) {
 
-		this(job, checkpointID, timestamp, completionTimestamp, taskStates, CheckpointProperties.forStandardCheckpoint(), null);
+		this(job, checkpointID, timestamp, completionTimestamp, taskStates,
+				CheckpointProperties.forStandardCheckpoint());
+	}
+
+	public CompletedCheckpoint(
+			JobID job,
+			long checkpointID,
+			long timestamp,
+			long completionTimestamp,
+			Map<JobVertexID, TaskState> taskStates,
+			CheckpointProperties props) {
+
+		this(job, checkpointID, timestamp, completionTimestamp, taskStates, props, null, null);
 	}
 
 	public CompletedCheckpoint(
@@ -86,24 +139,27 @@ public class CompletedCheckpoint implements Serializable {
 			long completionTimestamp,
 			Map<JobVertexID, TaskState> taskStates,
 			CheckpointProperties props,
-			String externalPath) {
+			@Nullable StreamStateHandle externalizedMetadata,
+			@Nullable String externalPointer) {
 
 		checkArgument(checkpointID >= 0);
 		checkArgument(timestamp >= 0);
 		checkArgument(completionTimestamp >= 0);
 
+		checkArgument((externalPointer == null) == (externalizedMetadata == null),
+				"external pointer without externalized metadata must be both null or both non-null");
+
+		checkArgument(!props.externalizeCheckpoint() || externalPointer != null, 
+			"Checkpoint properties require externalized checkpoint, but checkpoint is not externalized");
+
 		this.job = checkNotNull(job);
 		this.checkpointID = checkpointID;
 		this.timestamp = timestamp;
 		this.duration = completionTimestamp - timestamp;
 		this.taskStates = checkNotNull(taskStates);
 		this.props = checkNotNull(props);
-		this.externalPath = externalPath;
-
-		if (props.externalizeCheckpoint() && externalPath == null) {
-			throw new NullPointerException("Checkpoint properties say that the checkpoint " +
-					"should have been persisted, but missing external path.");
-		}
+		this.externalizedMetadata = externalizedMetadata;
+		this.externalPointer = externalPointer;
 	}
 
 	// ------------------------------------------------------------------------
@@ -146,10 +202,9 @@ public class CompletedCheckpoint implements Serializable {
 			discard();
 			return true;
 		} else {
-			if (externalPath != null) {
+			if (externalPointer != null) {
 				LOG.info("Persistent checkpoint with ID {} at '{}' not discarded.",
-						checkpointID,
-						externalPath);
+						checkpointID, externalPointer);
 			}
 
 			return false;
@@ -158,14 +213,36 @@ public class CompletedCheckpoint implements Serializable {
 
 	void discard() throws Exception {
 		try {
-			if (externalPath != null) {
-				SavepointStore.removeSavepointFile(externalPath);
+			// collect exceptions and continue cleanup
+			Exception exception = null;
+
+			// drop the metadata, if we have some
+			if (externalizedMetadata != null) {
+				try {
+					externalizedMetadata.discardState();
+				}
+				catch (Exception e) {
+					exception = e;
+				}
 			}
 
-			StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
-		} finally {
+			// drop the actual state
+			try {
+				StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
+			}
+			catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+
+			if (exception != null) {
+				throw exception;
+			}
+		}
+		finally {
 			taskStates.clear();
 
+			// to be null-pointer safe, copy reference to stack
+			DiscardCallback discardCallback = this.discardCallback;
 			if (discardCallback != null) {
 				discardCallback.notifyDiscardedCheckpoint();
 			}
@@ -190,8 +267,18 @@ public class CompletedCheckpoint implements Serializable {
 		return taskStates.get(jobVertexID);
 	}
 
-	public String getExternalPath() {
-		return externalPath;
+	public boolean isExternalized() {
+		return externalizedMetadata != null;
+	}
+
+	@Nullable
+	public StreamStateHandle getExternalizedMetadata() {
+		return externalizedMetadata;
+	}
+
+	@Nullable
+	public String getExternalPointer() {
+		return externalPointer;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index d2c0f6c..e91e038 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -72,4 +72,13 @@ public interface CompletedCheckpointStore {
 	 */
 	int getNumberOfRetainedCheckpoints();
 
+	/**
+	 * This method returns whether the completed checkpoint store requires checkpoints to be
+	 * externalized. Externalized checkpoints have their meta data persisted, which the checkpoint
+	 * store can exploit (for example by simply pointing the persisted metadata).
+	 * 
+	 * @return True, if the store requires that checkpoints are externalized before being added, false
+	 *         if the store stores the metadata itself.
+	 */
+	boolean requiresExternalizedCheckpoints();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 908ff7f..2c392b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -28,6 +29,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
@@ -41,7 +44,10 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,7 +80,7 @@ public class PendingCheckpoint {
 
 	/**
 	 * The checkpoint properties. If the checkpoint should be persisted
-	 * externally, it happens in {@link #finalizeCheckpoint()}.
+	 * externally, it happens in {@link #finalizeCheckpointExternalized()}.
 	 */
 	private final CheckpointProperties props;
 
@@ -203,46 +209,80 @@ public class PendingCheckpoint {
 		return onCompletionPromise;
 	}
 
-	public CompletedCheckpoint finalizeCheckpoint() {
+	public CompletedCheckpoint finalizeCheckpointExternalized() throws IOException {
 		synchronized (lock) {
-			Preconditions.checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
-
-			// Persist if required
-			String externalPath = null;
-			if (props.externalizeCheckpoint()) {
-				try {
-					Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
-					externalPath = SavepointStore.storeSavepoint(
-							targetDirectory,
-							savepoint
-					);
-				} catch (IOException e) {
-					LOG.error("Failed to persist checkpoint {}.",checkpointId, e);
-				}
-			}
+			checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
 
-			CompletedCheckpoint completed = new CompletedCheckpoint(
-					jobId,
-					checkpointId,
-					checkpointTimestamp,
-					System.currentTimeMillis(),
-					new HashMap<>(taskStates),
-					props,
-					externalPath);
+			// make sure we fulfill the promise with an exception if something fails
+			try {
+				// externalize the metadata
+				final Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
 
-			onCompletionPromise.complete(completed);
+				// TEMP FIX - The savepoint store is strictly typed to file systems currently
+				//            but the checkpoints think more generic. we need to work with file handles
+				//            here until the savepoint serializer accepts a generic stream factory
 
-			if (statsCallback != null) {
-				// Finalize the statsCallback and give the completed checkpoint a
-				// callback for discards.
-				CompletedCheckpointStats.DiscardCallback discardCallback = statsCallback.reportCompletedCheckpoint(externalPath);
-				completed.setDiscardCallback(discardCallback);
+				final FileStateHandle metadataHandle = SavepointStore.storeSavepointToHandle(targetDirectory, savepoint);
+				final String externalPointer = metadataHandle.getFilePath().getParent().toString();
+
+				return finalizeInternal(metadataHandle, externalPointer);
+			}
+			catch (Throwable t) {
+				onCompletionPromise.completeExceptionally(t);
+				ExceptionUtils.rethrowIOException(t);
+				return null; // silence the compiler
+			}
+		}
+	}
+
+	public CompletedCheckpoint finalizeCheckpointNonExternalized() {
+		synchronized (lock) {
+			checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
+
+			// make sure we fulfill the promise with an exception if something fails
+			try {
+				// finalize without external metadata
+				return finalizeInternal(null, null);
 			}
+			catch (Throwable t) {
+				onCompletionPromise.completeExceptionally(t);
+				ExceptionUtils.rethrow(t);
+				return null; // silence the compiler
+			}
+		}
+	}
 
-			dispose(false);
+	@GuardedBy("lock")
+	private CompletedCheckpoint finalizeInternal(
+			@Nullable StreamStateHandle externalMetadata,
+			@Nullable String externalPointer) {
 
-			return completed;
+		assert(Thread.holdsLock(lock));
+
+		CompletedCheckpoint completed = new CompletedCheckpoint(
+				jobId,
+				checkpointId,
+				checkpointTimestamp,
+				System.currentTimeMillis(),
+				new HashMap<>(taskStates),
+				props,
+				externalMetadata,
+				externalPointer);
+
+		onCompletionPromise.complete(completed);
+
+		if (statsCallback != null) {
+			// Finalize the statsCallback and give the completed checkpoint a
+			// callback for discards.
+			CompletedCheckpointStats.DiscardCallback discardCallback = 
+					statsCallback.reportCompletedCheckpoint(externalPointer);
+			completed.setDiscardCallback(discardCallback);
 		}
+
+		// mark this pending checkpoint as disposed, but do NOT drop the state
+		dispose(false);
+
+		return completed;
 	}
 
 	/**
@@ -411,9 +451,9 @@ public class PendingCheckpoint {
 						public void run() {
 							try {
 								StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
-							} catch (Exception e) {
-								LOG.warn("Could not properly dispose the pending checkpoint " +
-									"{} of job {}.", checkpointId, jobId, e);
+							} catch (Throwable t) {
+								LOG.warn("Could not properly dispose the pending checkpoint {} of job {}.", 
+										checkpointId, jobId, t);
 							} finally {
 								taskStates.clear();
 							}

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 082bca9..a0248b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -96,4 +96,8 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
 		}
 	}
 
+	@Override
+	public boolean requiresExternalizedCheckpoints() {
+		return false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index fdd0d40..4b03cea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -125,6 +125,11 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 		LOG.info("Initialized in '{}'.", checkpointsPath);
 	}
 
+	@Override
+	public boolean requiresExternalizedCheckpoints() {
+		return true;
+	}
+
 	/**
 	 * Gets the latest checkpoint from ZooKeeper and removes all others.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 950a9a0..60f0287 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -19,11 +19,13 @@
 package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,22 +48,27 @@ public class SavepointLoader {
 	 * @param jobId          The JobID of the job to load the savepoint for.
 	 * @param tasks          Tasks that will possibly be reset
 	 * @param savepointPath  The path of the savepoint to rollback to
-	 * @param userClassLoader The user code classloader
+	 * @param classLoader    The class loader to resolve serialized classes in legacy savepoint versions.
 	 * @param allowNonRestoredState Allow to skip checkpoint state that cannot be mapped
 	 * to any job vertex in tasks.
 	 *
 	 * @throws IllegalStateException If mismatch between program and savepoint state
-	 * @throws Exception             If savepoint store failure
+	 * @throws IOException             If savepoint store failure
 	 */
 	public static CompletedCheckpoint loadAndValidateSavepoint(
 			JobID jobId,
 			Map<JobVertexID, ExecutionJobVertex> tasks,
 			String savepointPath,
-			ClassLoader userClassLoader,
+			ClassLoader classLoader,
 			boolean allowNonRestoredState) throws IOException {
 
 		// (1) load the savepoint
-		Savepoint savepoint = SavepointStore.loadSavepoint(savepointPath, userClassLoader);
+		final Tuple2<Savepoint, StreamStateHandle> savepointAndHandle = 
+				SavepointStore.loadSavepointWithHandle(savepointPath, classLoader);
+
+		final Savepoint savepoint = savepointAndHandle.f0;
+		final StreamStateHandle metadataHandle = savepointAndHandle.f1;
+
 		final Map<JobVertexID, TaskState> taskStates = new HashMap<>(savepoint.getTaskStates().size());
 
 		boolean expandedToLegacyIds = false;
@@ -114,10 +121,12 @@ public class SavepointLoader {
 
 		// (3) convert to checkpoint so the system can fall back to it
 		CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
-		return new CompletedCheckpoint(jobId, savepoint.getCheckpointId(), 0L, 0L, taskStates, props, savepointPath);
+		return new CompletedCheckpoint(jobId, savepoint.getCheckpointId(), 0L, 0L,
+				taskStates, props, metadataHandle, savepointPath);
 	}
 
 	// ------------------------------------------------------------------------
 
+	/** This class is not meant to be instantiated */
 	private SavepointLoader() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 95370a5..5c8ac6b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -19,12 +19,15 @@
 package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -118,6 +121,28 @@ public class SavepointStore {
 	 * @throws IOException Failures during store are forwarded
 	 */
 	public static <T extends Savepoint> String storeSavepoint(String directory, T savepoint) throws IOException {
+		// write and create the file handle
+		FileStateHandle metadataFileHandle = storeSavepointToHandle(directory, savepoint);
+
+		// we return the savepoint directory path here!
+		// The directory path also works to resume from and is more elegant than the direct
+		// metadata file pointer
+		return metadataFileHandle.getFilePath().getParent().toString();
+	}
+
+	/**
+	 * Stores the savepoint metadata file to a state handle.
+	 *
+	 * @param directory Target directory to store savepoint in
+	 * @param savepoint Savepoint to be stored
+	 *                     
+	 * @return State handle to the checkpoint metadata
+	 * @throws IOException Failures during store are forwarded
+	 */
+	public static <T extends Savepoint> FileStateHandle storeSavepointToHandle(
+			String directory,
+			T savepoint) throws IOException {
+
 		checkNotNull(directory, "Target directory");
 		checkNotNull(savepoint, "Savepoint");
 
@@ -127,10 +152,9 @@ public class SavepointStore {
 		final FileSystem fs = FileSystem.get(basePath.toUri());
 
 		boolean success = false;
-		try (FSDataOutputStream fdos = fs.create(metadataFilePath, WriteMode.NO_OVERWRITE); 
+		try (FSDataOutputStream fdos = fs.create(metadataFilePath, WriteMode.NO_OVERWRITE);
 				DataOutputStream dos = new DataOutputStream(fdos))
 		{
-
 			// Write header
 			dos.writeInt(MAGIC_NUMBER);
 			dos.writeInt(savepoint.getVersion());
@@ -138,7 +162,13 @@ public class SavepointStore {
 			// Write savepoint
 			SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(savepoint);
 			serializer.serialize(savepoint, dos);
+
+			// construct result handle
+			FileStateHandle handle = new FileStateHandle(metadataFilePath, dos.size());
+
+			// all good!
 			success = true;
+			return handle;
 		}
 		finally {
 			if (!success && fs.exists(metadataFilePath)) {
@@ -147,22 +177,37 @@ public class SavepointStore {
 				}
 			}
 		}
-
-		// we return the savepoint directory path here!
-		// The directory path also works to resume from and is more elegant than the direct
-		// metadata file pointer
-		return basePath.toString();
 	}
 
 	/**
 	 * Loads the savepoint at the specified path.
 	 *
 	 * @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file.
+	 * @param classLoader The class loader used to resolve serialized classes from legacy savepoint formats.
 	 * @return The loaded savepoint
+	 * 
 	 * @throws IOException Failures during load are forwarded
 	 */
-	public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoader userClassLoader) throws IOException {
-		Preconditions.checkNotNull(savepointFileOrDirectory, "Path");
+	public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoader classLoader) throws IOException {
+		return loadSavepointWithHandle(savepointFileOrDirectory, classLoader).f0;
+	}
+
+	/**
+	 * Loads the savepoint at the specified path. This methods returns the savepoint, as well as the
+	 * handle to the metadata.
+	 *
+	 * @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file.
+	 * @param classLoader The class loader used to resolve serialized classes from legacy savepoint formats.
+	 * @return The loaded savepoint
+	 *
+	 * @throws IOException Failures during load are forwarded
+	 */
+	public static Tuple2<Savepoint, StreamStateHandle> loadSavepointWithHandle(
+			String savepointFileOrDirectory,
+			ClassLoader classLoader) throws IOException {
+		
+		checkNotNull(savepointFileOrDirectory, "savepointFileOrDirectory");
+		checkNotNull(classLoader, "classLoader");
 
 		Path path = new Path(savepointFileOrDirectory);
 
@@ -180,11 +225,13 @@ public class SavepointStore {
 				LOG.info("Using savepoint file in {}", path);
 			} else {
 				throw new IOException("Cannot find meta data file in directory " + path
-					+ ". Please try to load the savepoint directly from the meta data file "
-					+ "instead of the directory.");
+						+ ". Please try to load the savepoint directly from the meta data file "
+						+ "instead of the directory.");
 			}
 		}
 
+		// load the savepoint
+		final Savepoint savepoint;
 		try (DataInputStream dis = new DataInputViewStreamWrapper(fs.open(path))) {
 			int magicNumber = dis.readInt();
 
@@ -192,15 +239,27 @@ public class SavepointStore {
 				int version = dis.readInt();
 
 				SavepointSerializer<?> serializer = SavepointSerializers.getSerializer(version);
-				return serializer.deserialize(dis, userClassLoader);
+				savepoint = serializer.deserialize(dis, classLoader);
 			} else {
-				throw new RuntimeException("Unexpected magic number. This is most likely " +
-						"caused by trying to load a Flink 1.0 savepoint. You cannot load a " +
-						"savepoint triggered by Flink 1.0 with this version of Flink. If it is " +
-						"_not_ a Flink 1.0 savepoint, this error indicates that the specified " +
-						"file is not a proper savepoint or the file has been corrupted.");
+				throw new RuntimeException("Unexpected magic number. This can have multiple reasons: " +
+						"(1) You are trying to load a Flink 1.0 savepoint, which is not supported by this " +
+						"version of Flink. (2) The file you were pointing to is not a savepoint at all. " +
+						"(3) The savepoint file has been corrupted.");
 			}
 		}
+
+		// construct the stream handle to the metadata file
+		// we get the size best-effort
+		long size = 0;
+		try {
+			size = fs.getFileStatus(path).getLen();
+		}
+		catch (Exception ignored) {
+			// we don't know the size, but we don't want to fail the savepoint loading for that
+		}
+		StreamStateHandle metadataHandle = new FileStateHandle(path, size);
+
+		return new Tuple2<>(savepoint, metadataHandle);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
index c6f5c86..b250831 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FutureUtil;
 
 import java.util.concurrent.RunnableFuture;
@@ -42,26 +43,22 @@ public class StateUtil {
 			Iterable<? extends StateObject> handlesToDiscard) throws Exception {
 
 		if (handlesToDiscard != null) {
-
-			Exception suppressedExceptions = null;
+			Exception exception = null;
 
 			for (StateObject state : handlesToDiscard) {
 
 				if (state != null) {
 					try {
 						state.discardState();
-					} catch (Exception ex) {
-						//best effort to still cleanup other states and deliver exceptions in the end
-						if (suppressedExceptions == null) {
-							suppressedExceptions = new Exception(ex);
-						}
-						suppressedExceptions.addSuppressed(ex);
+					}
+					catch (Exception ex) {
+						exception = ExceptionUtils.firstOrSuppressed(ex, exception);
 					}
 				}
 			}
 
-			if (suppressedExceptions != null) {
-				throw suppressedExceptions;
+			if (exception != null) {
+				throw exception;
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 21749cb..87cd4ac 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager
 
-import java.io.{File, IOException}
+import java.io.IOException
 import java.net._
 import java.util.UUID
 import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _}
@@ -50,7 +50,7 @@ import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.executiongraph._
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, InstanceManager}
-import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
+import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus}
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway
@@ -77,7 +77,7 @@ import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
-import org.apache.flink.runtime.{FlinkActor, JobException, LeaderSessionMessageFilter, LogMessages}
+import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
 import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils}
 import org.jboss.netty.channel.ChannelException
 
@@ -611,7 +611,7 @@ class JobManager(
                 new BiFunction[CompletedCheckpoint, Throwable, Void] {
                   override def apply(success: CompletedCheckpoint, cause: Throwable): Void = {
                     if (success != null) {
-                      val path = success.getExternalPath()
+                      val path = success.getExternalPointer()
                       log.info(s"Savepoint stored in $path. Now cancelling $jobId.")
                       executionGraph.cancel()
                       senderRef ! decorateMessage(CancellationSuccess(jobId, path))
@@ -787,11 +787,11 @@ class JobManager(
                 new BiFunction[CompletedCheckpoint, Throwable, Void] {
                   override def apply(success: CompletedCheckpoint, cause: Throwable): Void = {
                     if (success != null) {
-                      if (success.getExternalPath != null) {
+                      if (success.getExternalPointer != null) {
                         senderRef ! TriggerSavepointSuccess(
                           jobId,
                           success.getCheckpointID,
-                          success.getExternalPath,
+                          success.getExternalPointer,
                           success.getTimestamp
                         )
                       } else {
@@ -1784,7 +1784,7 @@ class JobManager(
       case t: Throwable =>
         log.error(s"Could not properly unregister job $jobID form the library cache.", t)
     }
-    jobManagerMetricGroup.map(_.removeJob(jobID))
+    jobManagerMetricGroup.foreach(_.removeJob(jobID))
 
     futureOption
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index d4c3a2d..9517257 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -134,5 +134,10 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 		public int getNumberOfRetainedCheckpoints() {
 			return -1;
 		}
+
+		@Override
+		public boolean requiresExternalizedCheckpoints() {
+			return false;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 725b85f..f77c755 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -237,7 +237,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			Map<JobVertexID, TaskState> taskGroupStates,
 			CheckpointProperties props) {
 
-			super(jobId, checkpointId, timestamp, Long.MAX_VALUE, taskGroupStates, props, null);
+			super(jobId, checkpointId, timestamp, Long.MAX_VALUE, taskGroupStates, props);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 0d933ff..b34e9a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -19,9 +19,11 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -55,7 +57,9 @@ public class CompletedCheckpointTest {
 
 		// Verify discard call is forwarded to state
 		CompletedCheckpoint checkpoint = new CompletedCheckpoint(
-				new JobID(), 0, 0, 1, taskStates, CheckpointProperties.forStandardCheckpoint(), file.getAbsolutePath());
+				new JobID(), 0, 0, 1, taskStates, CheckpointProperties.forStandardCheckpoint(),
+				new FileStateHandle(new Path(file.toURI()), file.length()),
+				file.getAbsolutePath());
 
 		checkpoint.discard(JobStatus.FAILED);
 
@@ -74,7 +78,7 @@ public class CompletedCheckpointTest {
 		boolean discardSubsumed = true;
 		CheckpointProperties props = new CheckpointProperties(false, false, discardSubsumed, true, true, true, true);
 		CompletedCheckpoint checkpoint = new CompletedCheckpoint(
-				new JobID(), 0, 0, 1, taskStates, props, null);
+				new JobID(), 0, 0, 1, taskStates, props);
 
 		// Subsume
 		checkpoint.subsume();
@@ -104,7 +108,9 @@ public class CompletedCheckpointTest {
 			// Keep
 			CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false);
 			CompletedCheckpoint checkpoint = new CompletedCheckpoint(
-					new JobID(), 0, 0, 1, new HashMap<>(taskStates), props, externalPath);
+					new JobID(), 0, 0, 1, new HashMap<>(taskStates), props,
+					new FileStateHandle(new Path(file.toURI()), file.length()),
+					externalPath);
 
 			checkpoint.discard(status);
 			verify(state, times(0)).discardState();
@@ -113,7 +119,7 @@ public class CompletedCheckpointTest {
 			// Discard
 			props = new CheckpointProperties(false, false, true, true, true, true, true);
 			checkpoint = new CompletedCheckpoint(
-					new JobID(), 0, 0, 1, new HashMap<>(taskStates), props, null);
+					new JobID(), 0, 0, 1, new HashMap<>(taskStates), props);
 
 			checkpoint.discard(status);
 			verify(state, times(1)).discardState();
@@ -135,8 +141,7 @@ public class CompletedCheckpointTest {
 			0,
 			1,
 			new HashMap<>(taskStates),
-			CheckpointProperties.forStandardCheckpoint(),
-			null);
+			CheckpointProperties.forStandardCheckpoint());
 
 		CompletedCheckpointStats.DiscardCallback callback = mock(CompletedCheckpointStats.DiscardCallback.class);
 		completed.setDiscardCallback(callback);

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 3a85c4c..6f04f39 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -24,9 +24,11 @@ import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+
 import org.mockito.Mockito;
 
 import java.io.File;
@@ -49,9 +51,6 @@ import static org.mockito.Mockito.verify;
 
 public class PendingCheckpointTest {
 
-	@Rule
-	public TemporaryFolder tmpFolder = new TemporaryFolder();
-
 	private static final Map<ExecutionAttemptID, ExecutionVertex> ACK_TASKS = new HashMap<>();
 	private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID();
 
@@ -59,6 +58,9 @@ public class PendingCheckpointTest {
 		ACK_TASKS.put(ATTEMPT_ID, mock(ExecutionVertex.class));
 	}
 
+	@Rule
+	public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
 	/**
 	 * Tests that pending checkpoints can be subsumed iff they are forced.
 	 */
@@ -96,7 +98,7 @@ public class PendingCheckpointTest {
 		PendingCheckpoint pending = createPendingCheckpoint(persisted, tmp.getAbsolutePath());
 		pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
 		assertEquals(0, tmp.listFiles().length);
-		pending.finalizeCheckpoint();
+		pending.finalizeCheckpointExternalized();
 		assertEquals(1, tmp.listFiles().length);
 
 		// Ephemeral checkpoint
@@ -105,7 +107,7 @@ public class PendingCheckpointTest {
 		pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
 
 		assertEquals(1, tmp.listFiles().length);
-		pending.finalizeCheckpoint();
+		pending.finalizeCheckpointNonExternalized();
 		assertEquals(1, tmp.listFiles().length);
 	}
 
@@ -148,7 +150,8 @@ public class PendingCheckpointTest {
 
 		assertFalse(future.isDone());
 		pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
-		pending.finalizeCheckpoint();
+		assertTrue(pending.isFullyAcknowledged());
+		pending.finalizeCheckpointExternalized();
 		assertTrue(future.isDone());
 
 		// Finalize (missing ACKs)
@@ -157,7 +160,13 @@ public class PendingCheckpointTest {
 
 		assertFalse(future.isDone());
 		try {
-			pending.finalizeCheckpoint();
+			pending.finalizeCheckpointNonExternalized();
+			fail("Did not throw expected Exception");
+		} catch (IllegalStateException ignored) {
+			// Expected
+		}
+		try {
+			pending.finalizeCheckpointExternalized();
 			fail("Did not throw expected Exception");
 		} catch (IllegalStateException ignored) {
 			// Expected
@@ -233,7 +242,7 @@ public class PendingCheckpointTest {
 			pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
 			verify(callback, times(1)).reportSubtaskStats(any(JobVertexID.class), any(SubtaskStateStats.class));
 
-			pending.finalizeCheckpoint();
+			pending.finalizeCheckpointNonExternalized();
 			verify(callback, times(1)).reportCompletedCheckpoint(any(String.class));
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 5a38be2..cbb077c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -483,6 +483,10 @@ public class JobManagerHARecoveryTest {
 			return checkpoints.size();
 		}
 
+		@Override
+		public boolean requiresExternalizedCheckpoints() {
+			return false;
+		}
 	}
 
 	static class MyCheckpointRecoveryFactory implements CheckpointRecoveryFactory {

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 60b12d2..75f1fd4 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -979,7 +979,7 @@ class JobManagerITCase(_system: ActorSystem)
           jobManager.tell(TriggerSavepoint(jobGraph.getJobID(), Option.apply("any")), testActor)
 
           val checkpoint = Mockito.mock(classOf[CompletedCheckpoint])
-          when(checkpoint.getExternalPath).thenReturn("Expected test savepoint path")
+          when(checkpoint.getExternalPointer).thenReturn("Expected test savepoint path")
 
           // Succeed the promise
           savepointPromise.complete(checkpoint)

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 60a3a62..f910e49 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -167,7 +167,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Parallelism);
 
 		ActorSystem testSystem = null;
-		JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
+		final JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
 		LeaderRetrievalService leaderRetrievalService = null;
 		ActorSystem taskManagerSystem = null;