You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/10/14 08:06:02 UTC

[4/7] flink git commit: [FLINK-4512] [FLIP-10] Add option to persist periodic checkpoints

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.checkpoints.job.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.checkpoints.job.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.checkpoints.job.html
index faeac34..d21349c 100644
--- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.checkpoints.job.html
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.checkpoints.job.html
@@ -50,6 +50,10 @@ limitations under the License.
         <p><strong>Average:</strong><span> {{ jobCheckpointStats['size']['avg'] | humanizeBytes }}</span></p>
       </td>
     </tr>
+    <tr ng-if="jobCheckpointStats['external-path']">
+      <td colspan="4"><strong>Latest Checkpoint Path:</strong> {{ jobCheckpointStats['external-path'] }}
+      </td>
+    </tr>
   </tbody>
 </table>
 <div ng-if="!showHistory &amp;&amp; jobCheckpointStats &amp;&amp; jobCheckpointStats['history'].length &gt; 0"><a ng-click="toggleHistory()" class="btn btn-default"><strong>Show history</strong> ({{ jobCheckpointStats['history'].length }}) <i class="fa fa-chevron-down"></i></a></div>

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e95afe0..ab4bde7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -18,17 +18,20 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import akka.dispatch.Futures;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
@@ -43,7 +46,6 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -70,7 +72,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class CheckpointCoordinator {
 
-	protected static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
+	private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
 
 	/** The number of recent checkpoints whose IDs are remembered */
 	private static final int NUM_GHOST_CHECKPOINT_IDS = 16;
@@ -106,9 +108,9 @@ public class CheckpointCoordinator {
 	 * accessing this don't block the job manager actor and run asynchronously. */
 	private final CompletedCheckpointStore completedCheckpointStore;
 
-	/** Store for savepoints. */
-	private final SavepointStore savepointStore;
-	
+	/** Default directory for persistent checkpoints; <code>null</code> if none configured. */
+	private final String checkpointDirectory;
+
 	/** A list of recent checkpoint IDs, to identify late messages (vs invalid ones) */
 	private final ArrayDeque<Long> recentPendingCheckpoints;
 
@@ -157,6 +159,9 @@ public class CheckpointCoordinator {
 	/** Helper for tracking checkpoint statistics  */
 	private final CheckpointStatsTracker statsTracker;
 
+	/** Default checkpoint properties **/
+	private final CheckpointProperties checkpointProperties;
+
 	// --------------------------------------------------------------------------------------------
 
 	public CheckpointCoordinator(
@@ -165,12 +170,13 @@ public class CheckpointCoordinator {
 			long checkpointTimeout,
 			long minPauseBetweenCheckpoints,
 			int maxConcurrentCheckpointAttempts,
+			ExternalizedCheckpointSettings externalizeSettings,
 			ExecutionVertex[] tasksToTrigger,
 			ExecutionVertex[] tasksToWaitFor,
 			ExecutionVertex[] tasksToCommitTo,
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore completedCheckpointStore,
-			SavepointStore savepointStore,
+			String checkpointDirectory,
 			CheckpointStatsTracker statsTracker) {
 
 		// sanity checks
@@ -179,6 +185,12 @@ public class CheckpointCoordinator {
 		checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0");
 		checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1");
 
+		if (externalizeSettings.externalizeCheckpoints() && checkpointDirectory == null) {
+			throw new IllegalStateException("CheckpointConfig says to persist periodic " +
+					"checkpoints, but no checkpoint directory has been configured. You can " +
+					"configure configure one via key '" + ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
+		}
+
 		// it does not make sense to schedule checkpoints more often then the desired
 		// time between checkpoints
 		if (baseInterval < minPauseBetweenCheckpoints) {
@@ -196,12 +208,19 @@ public class CheckpointCoordinator {
 		this.pendingCheckpoints = new LinkedHashMap<>();
 		this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
 		this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
-		this.savepointStore = checkNotNull(savepointStore);
+		this.checkpointDirectory = checkpointDirectory;
 		this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
 		this.statsTracker = checkNotNull(statsTracker);
 
 		this.timer = new Timer("Checkpoint Timer", true);
 
+		if (externalizeSettings.externalizeCheckpoints()) {
+			LOG.info("Persisting periodic checkpoints externally at {}.", checkpointDirectory);
+			checkpointProperties = CheckpointProperties.forExternalizedCheckpoint(externalizeSettings.deleteOnCancellation());
+		} else {
+			checkpointProperties = CheckpointProperties.forStandardCheckpoint();
+		}
+
 		try {
 			// Make sure the checkpoint ID enumerator is running. Possibly
 			// issues a blocking call to ZooKeeper.
@@ -219,33 +238,9 @@ public class CheckpointCoordinator {
 	 * Shuts down the checkpoint coordinator.
 	 *
 	 * <p>After this method has been called, the coordinator does not accept
-	 * and further messages and cannot trigger any further checkpoints. All
-	 * checkpoint state is discarded.
-	 */
-	public void shutdown() throws Exception {
-		shutdown(true);
-	}
-
-	/**
-	 * Suspends the checkpoint coordinator.
-	 *
-	 * <p>After this method has been called, the coordinator does not accept
 	 * and further messages and cannot trigger any further checkpoints.
-	 *
-	 * <p>The difference to shutdown is that checkpoint state in the store
-	 * and counter is kept around if possible to recover later.
 	 */
-	public void suspend() throws Exception {
-		shutdown(false);
-	}
-
-	/**
-	 * Shuts down the checkpoint coordinator.
-	 *
-	 * @param shutdownStoreAndCounter Depending on this flag the checkpoint
-	 * state services are shut down or suspended.
-	 */
-	private void shutdown(boolean shutdownStoreAndCounter) throws Exception {
+	public void shutdown(JobStatus jobStatus) throws Exception {
 		synchronized (lock) {
 			if (!shutdown) {
 				shutdown = true;
@@ -263,13 +258,8 @@ public class CheckpointCoordinator {
 				}
 				pendingCheckpoints.clear();
 
-				if (shutdownStoreAndCounter) {
-					completedCheckpointStore.shutdown();
-					checkpointIdCounter.shutdown();
-				} else {
-					completedCheckpointStore.suspend();
-					checkpointIdCounter.suspend();
-				}
+				completedCheckpointStore.shutdown(jobStatus);
+				checkpointIdCounter.shutdown(jobStatus);
 			}
 		}
 	}
@@ -282,29 +272,49 @@ public class CheckpointCoordinator {
 	//  Handling checkpoints and messages
 	// --------------------------------------------------------------------------------------------
 
-	public Future<String> triggerSavepoint(long timestamp) throws Exception {
-		CheckpointTriggerResult result = triggerCheckpoint(timestamp, CheckpointProperties.forStandardSavepoint());
+	/**
+	 * Triggers a savepoint with the given savepoint directory as a target.
+	 *
+	 * @param timestamp The timestamp for the savepoint.
+	 * @param targetDirectory Target directory for the savepoint.
+	 * @return A future to the completed checkpoint
+	 * @throws IllegalStateException If no savepoint directory has been
+	 *                               specified and no default savepoint directory has been
+	 *                               configured
+	 * @throws Exception             Failures during triggering are forwarded
+	 */
+	public Future<CompletedCheckpoint> triggerSavepoint(long timestamp, String targetDirectory) throws Exception {
+		checkNotNull(targetDirectory, "Savepoint target directory");
+
+		CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
+		CheckpointTriggerResult result = triggerCheckpoint(timestamp, props, targetDirectory);
 
 		if (result.isSuccess()) {
-			PendingSavepoint savepoint = (PendingSavepoint) result.getPendingCheckpoint();
-			return savepoint.getCompletionFuture();
-		}
-		else {
-			return Futures.failed(new Exception("Failed to trigger savepoint: " + result.getFailureReason().message()));
+			return result.getPendingCheckpoint().getCompletionFuture();
+		} else {
+			Throwable cause = new Exception("Failed to trigger savepoint: " + result.getFailureReason().message());
+			Future<CompletedCheckpoint> failed = FlinkCompletableFuture.completedExceptionally(cause);
+			return failed;
 		}
 	}
 
 	/**
-	 * Triggers a new checkpoint and uses the given timestamp as the checkpoint
+	 * Triggers a new standard checkpoint and uses the given timestamp as the checkpoint
 	 * timestamp.
 	 *
 	 * @param timestamp The timestamp for the checkpoint.
+	 * @return <code>true</code> if triggering the checkpoint succeeded.
 	 */
 	public boolean triggerCheckpoint(long timestamp) throws Exception {
-		return triggerCheckpoint(timestamp, CheckpointProperties.forStandardCheckpoint()).isSuccess();
+		return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory).isSuccess();
 	}
 
-	CheckpointTriggerResult triggerCheckpoint(long timestamp, CheckpointProperties props) throws Exception {
+	CheckpointTriggerResult triggerCheckpoint(long timestamp, CheckpointProperties props, String targetDirectory) throws Exception {
+		// Sanity check
+		if (props.externalizeCheckpoint() && targetDirectory == null) {
+			throw new IllegalStateException("No target directory specified to persist checkpoint to.");
+		}
+
 		// make some eager pre-checks
 		synchronized (lock) {
 			// abort if the coordinator has been shutdown in the meantime
@@ -315,7 +325,7 @@ public class CheckpointCoordinator {
 			// validate whether the checkpoint can be triggered, with respect to the limit of
 			// concurrent checkpoints, and the minimum time between checkpoints.
 			// these checks are not relevant for savepoints
-			if (!props.isSavepoint()) {
+			if (!props.forceCheckpoint()) {
 				// sanity check: there should never be more than one trigger request queued
 				if (triggerRequestQueued) {
 					LOG.warn("Trying to trigger another checkpoint while one was queued already");
@@ -402,9 +412,13 @@ public class CheckpointCoordinator {
 				return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
 			}
 
-			final PendingCheckpoint checkpoint = props.isSavepoint() ?
-					new PendingSavepoint(job, checkpointID, timestamp, ackTasks, savepointStore) :
-					new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);
+			final PendingCheckpoint checkpoint = new PendingCheckpoint(
+					job,
+					checkpointID,
+					timestamp,
+					ackTasks,
+					props,
+					targetDirectory);
 
 			// schedule the timer that will clean up the expired checkpoints
 			TimerTask canceller = new TimerTask() {
@@ -439,7 +453,7 @@ public class CheckpointCoordinator {
 					if (shutdown) {
 						return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
 					}
-					else if (!props.isSavepoint()) {
+					else if (!props.forceCheckpoint()) {
 						if (triggerRequestQueued) {
 							LOG.warn("Trying to trigger another checkpoint while one was queued already");
 							return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
@@ -566,7 +580,7 @@ public class CheckpointCoordinator {
 				}
 				if (!haveMoreRecentPending && !triggerRequestQueued) {
 					LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId);
-					triggerCheckpoint(System.currentTimeMillis());
+					triggerCheckpoint(System.currentTimeMillis(), checkpoint.getProps(), checkpoint.getTargetDirectory());
 				} else if (!haveMoreRecentPending) {
 					LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId);
 					triggerQueuedRequests();

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
index 76af4be..48cec7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
@@ -18,29 +18,27 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
 /**
  * A checkpoint ID counter.
  */
 public interface CheckpointIDCounter {
 
 	/**
-	 * Starts the {@link CheckpointIDCounter} service.
+	 * Starts the {@link CheckpointIDCounter} service down.
 	 */
 	void start() throws Exception;
 
 	/**
-	 * Shuts the {@link CheckpointIDCounter} service down and frees all created
-	 * resources.
-	 */
-	void shutdown() throws Exception;
-
-	/**
-	 * Suspends the counter.
+	 * Shuts the {@link CheckpointIDCounter} service.
+	 *
+	 * <p>The job status is forwarded and used to decide whether state should
+	 * actually be discarded or kept.
 	 *
-	 * <p>If the implementation allows recovery, the counter state needs to be
-	 * kept. Otherwise, this acts as shutdown.
+	 * @param jobStatus Job state on shut down
 	 */
-	void suspend() throws Exception;
+	void shutdown(JobStatus jobStatus) throws Exception;
 
 	/**
 	 * Atomically increments the current checkpoint ID.

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index 7ea645a..e4856cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -18,44 +18,252 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+import java.io.Serializable;
+
 /**
  * The configuration of a checkpoint, such as whether
  * <ul>
- *     <li>The checkpoint is a savepoint</li>
- *     <li>The checkpoint must be full, or may be incremental</li>
- *     <li>The checkpoint format must be the common (cross backend) format, or may be state-backend specific</li>
+ *     <li>The checkpoint should be persisted</li>
+ *     <li>The checkpoint must be full, or may be incremental (not yet implemented)</li>
+ *     <li>The checkpoint format must be the common (cross backend) format,
+ *     or may be state-backend specific (not yet implemented)</li>
+ *     <li>when the checkpoint should be garbage collected</li>
  * </ul>
  */
-public class CheckpointProperties {
+public class CheckpointProperties implements Serializable {
+
+	private static final long serialVersionUID = -8835900655844879469L;
+
+	private final boolean forced;
+
+	private final boolean externalize;
 
-	private final boolean isSavepoint;
+	private final boolean discardSubsumed;
+	private final boolean discardFinished;
+	private final boolean discardCancelled;
+	private final boolean discardFailed;
+	private final boolean discardSuspended;
 
-	private CheckpointProperties(boolean isSavepoint) {
-		this.isSavepoint = isSavepoint;
+	CheckpointProperties(
+			boolean forced,
+			boolean externalize,
+			boolean discardSubsumed,
+			boolean discardFinished,
+			boolean discardCancelled,
+			boolean discardFailed,
+			boolean discardSuspended) {
+
+		this.forced = forced;
+		this.externalize = externalize;
+		this.discardSubsumed = discardSubsumed;
+		this.discardFinished = discardFinished;
+		this.discardCancelled = discardCancelled;
+		this.discardFailed = discardFailed;
+		this.discardSuspended = discardSuspended;
+
+		// Not persisted, but needs manual clean up
+		if (!externalize && !(discardSubsumed && discardFinished && discardCancelled
+				&& discardFailed && discardSuspended)) {
+			throw new IllegalStateException("CheckpointProperties say to *not* persist the " +
+					"checkpoint, but the checkpoint requires manual cleanup.");
+		}
 	}
 
 	// ------------------------------------------------------------------------
 
-	public boolean isSavepoint() {
-		return isSavepoint;
+	/**
+	 * Returns whether the checkpoint should be forced.
+	 *
+	 * <p>Forced checkpoints ignore the configured maximum number of concurrent
+	 * checkpoints and minimum time between checkpoints. Furthermore, they are
+	 * not subsumed by more recent checkpoints as long as they are pending.
+	 *
+	 * @return <code>true</code> if the checkpoint should be forced;
+	 * <code>false</code> otherwise.
+	 *
+	 * @see CheckpointCoordinator
+	 * @see PendingCheckpoint
+	 */
+	public boolean forceCheckpoint() {
+		return forced;
+	}
+
+	/**
+	 * Returns whether the checkpoint should be persisted externally.
+	 *
+	 * @return <code>true</code> if the checkpoint should be persisted
+	 * externally; <code>false</code> otherwise.
+	 *
+	 * @see PendingCheckpoint
+	 */
+	public boolean externalizeCheckpoint() {
+		return externalize;
 	}
 
 	// ------------------------------------------------------------------------
+	// Garbage collection behaviour
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Returns whether the checkpoint should be discarded when it is subsumed.
+	 *
+	 * <p>A checkpoint is subsumed when the maximum number of retained
+	 * checkpoints is reached and a more recent checkpoint completes..
+	 *
+	 * @return <code>true</code> if the checkpoint should be discarded when it
+	 * is subsumed; <code>false</code> otherwise.
+	 *
+	 * @see CompletedCheckpointStore
+	 */
+	public boolean discardOnSubsumed() {
+		return discardSubsumed;
+	}
+
+	/**
+	 * Returns whether the checkpoint should be discarded when the owning job
+	 * reaches the {@link JobStatus#FINISHED} state.
+	 *
+	 * @return <code>true</code> if the checkpoint should be discarded when the
+	 * owning job reaches the {@link JobStatus#FINISHED} state; <code>false</code>
+	 * otherwise.
+	 *
+	 * @see CompletedCheckpointStore
+	 */
+	public boolean discardOnJobFinished() {
+		return discardFinished;
+	}
+
+	/**
+	 * Returns whether the checkpoint should be discarded when the owning job
+	 * reaches the {@link JobStatus#CANCELED} state.
+	 *
+	 * @return <code>true</code> if the checkpoint should be discarded when the
+	 * owning job reaches the {@link JobStatus#CANCELED} state; <code>false</code>
+	 * otherwise.
+	 *
+	 * @see CompletedCheckpointStore
+	 */
+	public boolean discardOnJobCancelled() {
+		return discardCancelled;
+	}
+
+	/**
+	 * Returns whether the checkpoint should be discarded when the owning job
+	 * reaches the {@link JobStatus#FAILED} state.
+	 *
+	 * @return <code>true</code> if the checkpoint should be discarded when the
+	 * owning job reaches the {@link JobStatus#FAILED} state; <code>false</code>
+	 * otherwise.
+	 *
+	 * @see CompletedCheckpointStore
+	 */
+	public boolean discardOnJobFailed() {
+		return discardFailed;
+	}
+
+	/**
+	 * Returns whether the checkpoint should be discarded when the owning job
+	 * reaches the {@link JobStatus#SUSPENDED} state.
+	 *
+	 * @return <code>true</code> if the checkpoint should be discarded when the
+	 * owning job reaches the {@link JobStatus#SUSPENDED} state; <code>false</code>
+	 * otherwise.
+	 *
+	 * @see CompletedCheckpointStore
+	 */
+	public boolean discardOnJobSuspended() {
+		return discardSuspended;
+	}
+
+	// ------------------------------------------------------------------------
+
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		CheckpointProperties that = (CheckpointProperties) o;
+		return forced == that.forced &&
+				externalize == that.externalize &&
+				discardSubsumed == that.discardSubsumed &&
+				discardFinished == that.discardFinished &&
+				discardCancelled == that.discardCancelled &&
+				discardFailed == that.discardFailed &&
+				discardSuspended == that.discardSuspended;
+	}
+
+	@Override
+	public int hashCode() {
+		int result = (forced ? 1 : 0);
+		result = 31 * result + (externalize ? 1 : 0);
+		result = 31 * result + (discardSubsumed ? 1 : 0);
+		result = 31 * result + (discardFinished ? 1 : 0);
+		result = 31 * result + (discardCancelled ? 1 : 0);
+		result = 31 * result + (discardFailed ? 1 : 0);
+		result = 31 * result + (discardSuspended ? 1 : 0);
+		return result;
+	}
 
 	@Override
 	public String toString() {
-		return "CheckpointProperties {" +
-				"isSavepoint=" + isSavepoint +
+		return "CheckpointProperties{" +
+				"forced=" + forced +
+				", externalize=" + externalizeCheckpoint() +
+				", discardSubsumed=" + discardSubsumed +
+				", discardFinished=" + discardFinished +
+				", discardCancelled=" + discardCancelled +
+				", discardFailed=" + discardFailed +
+				", discardSuspended=" + discardSuspended +
 				'}';
 	}
 
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Creates the checkpoint properties for a (manually triggered) savepoint.
+	 *
+	 * <p>Savepoints are forced and persisted externally. They have to be
+	 * garbage collected manually.
+	 *
+	 * @return Checkpoint properties for a (manually triggered) savepoint.
+	 */
 	public static CheckpointProperties forStandardSavepoint() {
-		return new CheckpointProperties(true);
+		return new CheckpointProperties(true, true, false, false, false, false, false);
 	}
 
+	/**
+	 * Creates the checkpoint properties for a regular checkpoint.
+	 *
+	 * <p>Regular checkpoints are not forced and not persisted externally. They
+	 * are garbage collected automatically.
+	 *
+	 * @return Checkpoint properties for a regular checkpoint.
+	 */
 	public static CheckpointProperties forStandardCheckpoint() {
-		return new CheckpointProperties(false);
+		return new CheckpointProperties(false, false, true, true, true, true, true);
+	}
+
+	/**
+	 * Creates the checkpoint properties for an external checkpoint.
+	 *
+	 * <p>External checkpoints are not forced, but persisted externally. They
+	 * are garbage collected automatically, except when the owning job
+	 * terminates in state {@link JobStatus#FAILED}. The user is required to
+	 * configure the clean up behaviour on job cancellation.
+	 *
+	 * @param deleteOnCancellation Flag indicating whether to discard on cancellation.
+	 *
+	 * @return Checkpoint properties for an external checkpoint.
+	 */
+	public static CheckpointProperties forExternalizedCheckpoint(boolean deleteOnCancellation) {
+		return new CheckpointProperties(false, true, true, true, deleteOnCancellation, false, true);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 0d279f1..e135272 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -19,11 +19,15 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Map;
 import java.util.Objects;
 
@@ -34,7 +38,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state)
  * and that is considered completed.
  */
-public class CompletedCheckpoint implements StateObject {
+public class CompletedCheckpoint implements Serializable {
+
+	private static final Logger LOG = LoggerFactory.getLogger(CompletedCheckpoint.class);
 
 	private static final long serialVersionUID = -8360248179615702014L;
 
@@ -51,9 +57,11 @@ public class CompletedCheckpoint implements StateObject {
 	/** States of the different task groups belonging to this checkpoint */
 	private final Map<JobVertexID, TaskState> taskStates;
 
-	/** Flag to indicate whether the completed checkpoint data should be deleted when this
-	 * handle to the checkpoint is disposed */
-	private final boolean deleteStateWhenDisposed;
+	/** Properties for this checkpoint. */
+	private final CheckpointProperties props;
+
+	/** External path if persisted checkpoint; <code>null</code> otherwise. */
+	private final String externalPath;
 
 	// ------------------------------------------------------------------------
 
@@ -63,7 +71,8 @@ public class CompletedCheckpoint implements StateObject {
 			long timestamp,
 			long completionTimestamp,
 			Map<JobVertexID, TaskState> taskStates,
-			boolean deleteStateWhenDisposed) {
+			CheckpointProperties props,
+			String externalPath) {
 
 		checkArgument(checkpointID >= 0);
 		checkArgument(timestamp >= 0);
@@ -74,7 +83,13 @@ public class CompletedCheckpoint implements StateObject {
 		this.timestamp = timestamp;
 		this.duration = completionTimestamp - timestamp;
 		this.taskStates = checkNotNull(taskStates);
-		this.deleteStateWhenDisposed = deleteStateWhenDisposed;
+		this.props = checkNotNull(props);
+		this.externalPath = externalPath;
+
+		if (props.externalizeCheckpoint() && externalPath == null) {
+			throw new NullPointerException("Checkpoint properties say that the checkpoint " +
+					"should have been persisted, but missing external path.");
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -95,19 +110,50 @@ public class CompletedCheckpoint implements StateObject {
 		return duration;
 	}
 
-	@Override
-	public void discardState() throws Exception {
-		if (deleteStateWhenDisposed) {
+	public CheckpointProperties getProperties() {
+		return props;
+	}
 
-			try {
-				StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
-			} finally {
-				taskStates.clear();
+	public boolean subsume() throws Exception {
+		if (props.discardOnSubsumed()) {
+			discard();
+			return true;
+		}
+
+		return false;
+	}
+
+	public boolean discard(JobStatus jobStatus) throws Exception {
+		if (jobStatus == JobStatus.FINISHED && props.discardOnJobFinished() ||
+				jobStatus == JobStatus.CANCELED && props.discardOnJobCancelled() ||
+				jobStatus == JobStatus.FAILED && props.discardOnJobFailed() ||
+				jobStatus == JobStatus.SUSPENDED && props.discardOnJobSuspended()) {
+
+			discard();
+			return true;
+		} else {
+			if (externalPath != null) {
+				LOG.info("Persistent checkpoint with ID {} at '{}' not discarded.",
+						checkpointID,
+						externalPath);
 			}
+
+			return false;
+		}
+	}
+
+	private void discard() throws Exception {
+		try {
+			if (externalPath != null) {
+				SavepointStore.removeSavepoint(externalPath);
+			}
+
+			StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
+		} finally {
+			taskStates.clear();
 		}
 	}
 
-	@Override
 	public long getStateSize() throws IOException {
 		long result = 0L;
 
@@ -126,6 +172,10 @@ public class CompletedCheckpoint implements StateObject {
 		return taskStates.get(jobVertexID);
 	}
 
+	public String getExternalPath() {
+		return externalPath;
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override
@@ -153,4 +203,5 @@ public class CompletedCheckpoint implements StateObject {
 	public String toString() {
 		return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job);
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index c52fc25..d2c0f6c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
 import java.util.List;
 
 /**
@@ -38,7 +40,7 @@ public interface CompletedCheckpointStore {
 	 *
 	 * <p>Only a bounded number of checkpoints is kept. When exceeding the maximum number of
 	 * retained checkpoints, the oldest one will be discarded via {@link
-	 * CompletedCheckpoint#discard(ClassLoader)}.
+	 * CompletedCheckpoint#discard()}.
 	 */
 	void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception;
 
@@ -49,17 +51,14 @@ public interface CompletedCheckpointStore {
 	CompletedCheckpoint getLatestCheckpoint() throws Exception;
 
 	/**
-	 * Shuts down the store and discards all checkpoint instances.
-	 */
-	void shutdown() throws Exception;
-
-	/**
-	 * Suspends the store.
+	 * Shuts down the store.
+	 *
+	 * <p>The job status is forwarded and used to decide whether state should
+	 * actually be discarded or kept.
 	 *
-	 * <p>If the implementation allows recovery, checkpoint state needs to be
-	 * kept around. Otherwise, this should act like shutdown.
+	 * @param jobStatus Job state on shut down
 	 */
-	void suspend() throws Exception;
+	void shutdown(JobStatus jobStatus) throws Exception;
 
 	/**
 	 * Returns all {@link CompletedCheckpoint} instances.

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 2ca9d69..983f1d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -19,6 +19,11 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -28,6 +33,8 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.List;
@@ -46,6 +53,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class PendingCheckpoint {
 
+	private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
+
 	private final Object lock = new Object();
 
 	private final JobID jobId;
@@ -58,7 +67,17 @@ public class PendingCheckpoint {
 
 	private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;
 
-	private final boolean disposeWhenSubsumed;
+	/**
+	 * The checkpoint properties. If the checkpoint should be persisted
+	 * externally, it happens in {@link #finalizeCheckpoint()}.
+	 */
+	private final CheckpointProperties props;
+
+	/** Target directory to potentially persist checkpoint to; <code>null</code> if none configured. */
+	private final String targetDirectory;
+
+	/** The promise to fulfill once the checkpoint has been completed. */
+	private final FlinkCompletableFuture<CompletedCheckpoint> onCompletionPromise = new FlinkCompletableFuture<>();
 
 	private int numAcknowledgedTasks;
 
@@ -70,23 +89,21 @@ public class PendingCheckpoint {
 			JobID jobId,
 			long checkpointId,
 			long checkpointTimestamp,
-			Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm) {
-		this(jobId, checkpointId, checkpointTimestamp, verticesToConfirm, true);
-	}
-
-	PendingCheckpoint(
-			JobID jobId,
-			long checkpointId,
-			long checkpointTimestamp,
 			Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm,
-			boolean disposeWhenSubsumed)
-	{
+			CheckpointProperties props,
+			String targetDirectory) {
 		this.jobId = checkNotNull(jobId);
 		this.checkpointId = checkpointId;
 		this.checkpointTimestamp = checkpointTimestamp;
 		this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm);
-		this.disposeWhenSubsumed = disposeWhenSubsumed;
 		this.taskStates = new HashMap<>();
+		this.props = checkNotNull(props);
+		this.targetDirectory = targetDirectory;
+
+		// Sanity check
+		if (props.externalizeCheckpoint() && targetDirectory == null) {
+			throw new NullPointerException("No target directory specified to persist checkpoint to.");
+		}
 
 		checkArgument(verticesToConfirm.size() > 0,
 				"Checkpoint needs at least one vertex that commits the checkpoint");
@@ -137,33 +154,71 @@ public class PendingCheckpoint {
 	 * @return True if the checkpoint can be subsumed, false otherwise.
 	 */
 	public boolean canBeSubsumed() {
-		return true;
+		// If the checkpoint is forced, it cannot be subsumed.
+		return !props.forceCheckpoint();
+	}
+
+	CheckpointProperties getProps() {
+		return props;
+	}
+
+	String getTargetDirectory() {
+		return targetDirectory;
 	}
 
 	// ------------------------------------------------------------------------
 	//  Progress and Completion
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Returns the completion future.
+	 *
+	 * @return A future to the completed checkpoint
+	 */
+	public Future<CompletedCheckpoint> getCompletionFuture() {
+		return onCompletionPromise;
+	}
+
 	public CompletedCheckpoint finalizeCheckpoint() throws Exception {
 		synchronized (lock) {
-			if (discarded) {
-				throw new IllegalStateException("pending checkpoint is discarded");
-			}
-			if (notYetAcknowledgedTasks.isEmpty()) {
-				CompletedCheckpoint completed =  new CompletedCheckpoint(
-					jobId,
-					checkpointId,
-					checkpointTimestamp,
-					System.currentTimeMillis(),
-					new HashMap<>(taskStates),
-					disposeWhenSubsumed);
-
-				dispose(false);
-
-				return completed;
-			}
-			else {
-				throw new IllegalStateException("Cannot complete checkpoint while not all tasks are acknowledged");
+			try {
+				if (discarded) {
+					throw new IllegalStateException("pending checkpoint is discarded");
+				}
+				if (notYetAcknowledgedTasks.isEmpty()) {
+					// Persist if required
+					String externalPath = null;
+					if (props.externalizeCheckpoint()) {
+						try {
+							Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
+							externalPath = SavepointStore.storeSavepoint(
+									targetDirectory,
+									savepoint);
+						} catch (Throwable t) {
+							LOG.error("Failed to persist checkpoints " + checkpointId + ".", t);
+						}
+					}
+
+					CompletedCheckpoint completed = new CompletedCheckpoint(
+							jobId,
+							checkpointId,
+							checkpointTimestamp,
+							System.currentTimeMillis(),
+							new HashMap<>(taskStates),
+							props,
+							externalPath);
+
+					onCompletionPromise.complete(completed);
+
+					dispose(false);
+
+					return completed;
+				} else {
+					throw new IllegalStateException("Cannot complete checkpoint while not all tasks are acknowledged");
+				}
+			} catch (Throwable t) {
+				onCompletionPromise.completeExceptionally(t);
+				throw t;
 			}
 		}
 	}
@@ -180,7 +235,6 @@ public class PendingCheckpoint {
 			ExecutionVertex vertex = notYetAcknowledgedTasks.remove(attemptID);
 
 			if (vertex != null) {
-
 				if (checkpointStateHandles != null) {
 					List<KeyGroupsStateHandle> keyGroupsState = checkpointStateHandles.getKeyGroupsStateHandle();
 					ChainedStateHandle<StreamStateHandle> nonPartitionedState =
@@ -256,18 +310,36 @@ public class PendingCheckpoint {
 	 * Aborts a checkpoint because it expired (took too long).
 	 */
 	public void abortExpired() throws Exception {
-		dispose(true);
+		try {
+			onCompletionPromise.completeExceptionally(new Exception("Checkpoint expired before completing"));
+		} finally {
+			dispose(true);
+		}
 	}
 
 	/**
 	 * Aborts the pending checkpoint because a newer completed checkpoint subsumed it.
 	 */
 	public void abortSubsumed() throws Exception {
-		dispose(true);
+		try {
+			if (props.forceCheckpoint()) {
+				onCompletionPromise.completeExceptionally(new Exception("Bug: forced checkpoints must never be subsumed"));
+
+				throw new IllegalStateException("Bug: forced checkpoints must never be subsumed");
+			} else {
+				onCompletionPromise.completeExceptionally(new Exception("Checkpoints has been subsumed"));
+			}
+		} finally {
+			dispose(true);
+		}
 	}
 
 	public void abortDeclined() throws Exception {
-		dispose(true);
+		try {
+			onCompletionPromise.completeExceptionally(new Exception("Checkpoint was declined (tasks not ready)"));
+		} finally {
+			dispose(true);
+		}
 	}
 
 	/**
@@ -275,10 +347,14 @@ public class PendingCheckpoint {
 	 * @param cause The error's exception.
 	 */
 	public void abortError(Throwable cause) throws Exception {
-		dispose(true);
+		try {
+			onCompletionPromise.completeExceptionally(new Exception("Checkpoint failed: " + cause.getMessage(), cause));
+		} finally {
+			dispose(true);
+		}
 	}
 
-	protected void dispose(boolean releaseState) throws Exception {
+	private void dispose(boolean releaseState) throws Exception {
 		synchronized (lock) {
 			try {
 				discarded = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
deleted file mode 100644
index 0bb6a91..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.util.ExceptionUtils;
-import org.slf4j.Logger;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A pending savepoint is like a pending checkpoint, but it additionally performs some
- * actions upon completion, like notifying the triggerer.
- */
-public class PendingSavepoint extends PendingCheckpoint {
-
-	private static final Logger LOG = CheckpointCoordinator.LOG;
-
-	private final SavepointStore store;
-
-	/** The promise to fulfill once the savepoint is complete */
-	private final Promise<String> onCompletionPromise;
-	
-	// --------------------------------------------------------------------------------------------
-
-	public PendingSavepoint(
-			JobID jobId,
-			long checkpointId,
-			long checkpointTimestamp,
-			Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm,
-			SavepointStore store)
-	{
-		super(jobId, checkpointId, checkpointTimestamp, verticesToConfirm, false);
-
-		this.store = checkNotNull(store);
-		this.onCompletionPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Savepoint completion
-	// --------------------------------------------------------------------------------------------
-
-	public Future<String> getCompletionFuture() {
-		return onCompletionPromise.future();
-	}
-	
-	@Override
-	public CompletedCheckpoint finalizeCheckpoint() throws Exception {
-		// finalize checkpoint (this also disposes this pending checkpoint)
-		CompletedCheckpoint completedCheckpoint = super.finalizeCheckpoint();
-
-		// now store the checkpoint externally as a savepoint
-		try {
-			Savepoint savepoint = new SavepointV1(
-					completedCheckpoint.getCheckpointID(),
-					completedCheckpoint.getTaskStates().values());
-			
-			String path = store.storeSavepoint(savepoint);
-			onCompletionPromise.success(path);
-		}
-		catch (Throwable t) {
-			LOG.warn("Failed to store savepoint.", t);
-			onCompletionPromise.failure(t);
-
-			ExceptionUtils.rethrow(t, "Failed to store savepoint.");
-		}
-
-		return completedCheckpoint;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Cancellation / Disposal
-	// ------------------------------------------------------------------------
-
-	@Override
-	public boolean canBeSubsumed() {
-		return false;
-	}
-
-	@Override
-	public void abortSubsumed() throws Exception {
-		try {
-			Exception e = new Exception("Bug: Savepoints must never be subsumed");
-			onCompletionPromise.failure(e);
-			throw e;
-		}
-		finally {
-			dispose(true);
-		}
-	}
-
-	@Override
-	public void abortExpired() throws Exception {
-		try {
-			LOG.info("Savepoint with checkpoint ID " + getCheckpointId() + " expired before completing.");
-			onCompletionPromise.failure(new Exception("Savepoint expired before completing"));
-		}
-		finally {
-			dispose(true);
-		}
-	}
-
-	@Override
-	public void abortDeclined() throws Exception {
-		try {
-			LOG.info("Savepoint with checkpoint ID " + getCheckpointId() + " was declined (tasks not ready).");
-			onCompletionPromise.failure(new Exception("Savepoint was declined (tasks not ready)"));
-		}
-		finally {
-			dispose(true);
-		}
-	}
-
-	@Override
-	public void abortError(Throwable cause) throws Exception {
-		try {
-			LOG.info("Savepoint with checkpoint ID " + getCheckpointId() + " failed due to an error", cause);
-			onCompletionPromise.failure(
-					new Exception("Savepoint could not be completed: " + cause.getMessage(), cause));
-		}
-		finally {
-			dispose(true);
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public String toString() {
-		return String.format("Pending Savepoint %d @ %d - confirmed=%d, pending=%d",
-				getCheckpointId(), getCheckpointTimestamp(),
-				getNumberOfAcknowledgedTasks(), getNumberOfNonAcknowledgedTasks());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
index 84cbe19..e4ed996 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 import java.util.concurrent.atomic.AtomicLong;
@@ -35,10 +36,7 @@ public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {
 	public void start() throws Exception {}
 
 	@Override
-	public void shutdown() throws Exception {}
-
-	@Override
-	public void suspend() throws Exception {}
+	public void shutdown(JobStatus jobStatus) throws Exception {}
 
 	@Override
 	public long getAndIncrement() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index aecb51e..a9624fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -41,7 +41,7 @@ public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFa
 			throws Exception {
 
 		return new StandaloneCompletedCheckpointStore(CheckpointRecoveryFactory
-				.NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, userClassLoader);
+				.NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 5e03988..082bca9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.state.StateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -32,6 +34,8 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  */
 public class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore {
 
+	private static final Logger LOG = LoggerFactory.getLogger(StandaloneCompletedCheckpointStore.class);
+
 	/** The maximum number of checkpoints to retain (at least 1). */
 	private final int maxNumberOfCheckpointsToRetain;
 
@@ -44,16 +48,10 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
 	 * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at
 	 *                                       least 1). Adding more checkpoints than this results
 	 *                                       in older checkpoints being discarded.
-	 * @param userClassLoader                The user class loader used to discard checkpoints
 	 */
-	public StandaloneCompletedCheckpointStore(
-			int maxNumberOfCheckpointsToRetain,
-			ClassLoader userClassLoader) {
-
+	public StandaloneCompletedCheckpointStore(int maxNumberOfCheckpointsToRetain) {
 		checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
-
 		this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;
-
 		this.checkpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
 	}
 
@@ -64,9 +62,9 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
 
 	@Override
 	public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
-		checkpoints.addLast(checkpoint);
+		checkpoints.add(checkpoint);
 		if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
-			checkpoints.removeFirst().discardState();
+			checkpoints.remove().subsume();
 		}
 	}
 
@@ -86,17 +84,16 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
 	}
 
 	@Override
-	public void shutdown() throws Exception {
+	public void shutdown(JobStatus jobStatus) throws Exception {
 		try {
-			StateUtil.bestEffortDiscardAllStateObjects(checkpoints);
+			LOG.info("Shutting down");
+
+			for (CompletedCheckpoint checkpoint : checkpoints) {
+				checkpoint.discard(jobStatus);
+			}
 		} finally {
 			checkpoints.clear();
 		}
 	}
 
-	@Override
-	public void suspend() throws Exception {
-		// Do a regular shutdown, because we can't recovery anything
-		shutdown();
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
index 0bceb8b..80e79b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
@@ -23,6 +23,7 @@ import org.apache.curator.framework.recipes.shared.SharedCount;
 import org.apache.curator.framework.recipes.shared.VersionedValue;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -91,30 +92,17 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
 	}
 
 	@Override
-	public void shutdown() throws Exception {
+	public void shutdown(JobStatus jobStatus) throws Exception {
 		synchronized (startStopLock) {
 			if (isStarted) {
 				LOG.info("Shutting down.");
 				sharedCount.close();
 				client.getConnectionStateListenable().removeListener(connStateListener);
 
-				LOG.info("Removing {} from ZooKeeper", counterPath);
-				client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath);
-
-				isStarted = false;
-			}
-		}
-	}
-
-	@Override
-	public void suspend() throws Exception {
-		synchronized (startStopLock) {
-			if (isStarted) {
-				LOG.info("Suspending.");
-				sharedCount.close();
-				client.getConnectionStateListenable().removeListener(connStateListener);
-
-				// Don't remove any state
+				if (jobStatus.isGloballyTerminalState()) {
+					LOG.info("Removing {} from ZooKeeper", counterPath);
+					client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath);
+				}
 
 				isStarted = false;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index 55a0bbb..f47012d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -55,7 +55,7 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac
 			throws Exception {
 
 		return ZooKeeperUtils.createCompletedCheckpoints(client, config, jobId,
-				NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, userClassLoader);
+				NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index b826d9f..4f67921 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -24,9 +24,10 @@ import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,6 +36,7 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
 import java.util.List;
+import java.util.concurrent.Callable;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -75,9 +77,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	/** The maximum number of checkpoints to retain (at least 1). */
 	private final int maxNumberOfCheckpointsToRetain;
 
-	/** User class loader for discarding {@link CompletedCheckpoint} instances. */
-	private final ClassLoader userClassLoader;
-
 	/** Local completed checkpoints. */
 	private final ArrayDeque<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointStateHandles;
 
@@ -88,7 +87,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	 *                                       least 1). Adding more checkpoints than this results
 	 *                                       in older checkpoints being discarded. On recovery,
 	 *                                       we will only start with a single checkpoint.
-	 * @param userClassLoader                The user class loader used to discard checkpoints
 	 * @param client                         The Curator ZooKeeper client
 	 * @param checkpointsPath                The ZooKeeper path for the checkpoints (needs to
 	 *                                       start with a '/')
@@ -98,7 +96,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	 */
 	public ZooKeeperCompletedCheckpointStore(
 			int maxNumberOfCheckpointsToRetain,
-			ClassLoader userClassLoader,
 			CuratorFramework client,
 			String checkpointsPath,
 			RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage) throws Exception {
@@ -107,7 +104,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 		checkNotNull(stateStorage, "State storage");
 
 		this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;
-		this.userClassLoader = checkNotNull(userClassLoader, "User class loader");
 
 		checkNotNull(client, "Curator client");
 		checkNotNull(checkpointsPath, "Checkpoints path");
@@ -172,7 +168,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 
 			for (int i = 0; i < numberOfInitialCheckpoints - 1; i++) {
 				try {
-					removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i));
+					removeSubsumed(initialCheckpoints.get(i));
 				}
 				catch (Exception e) {
 					LOG.error("Failed to discard checkpoint", e);
@@ -200,7 +196,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 
 		// Everything worked, let's remove a previous checkpoint if necessary.
 		if (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) {
-			removeFromZooKeeperAndDiscardCheckpoint(checkpointStateHandles.removeFirst());
+			removeSubsumed(checkpointStateHandles.removeFirst());
 		}
 
 		LOG.debug("Added {} to {}.", checkpoint, path);
@@ -233,68 +229,90 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	}
 
 	@Override
-	public void shutdown() throws Exception {
-		LOG.info("Shutting down");
+	public void shutdown(JobStatus jobStatus) throws Exception {
+		if (jobStatus.isGloballyTerminalState()) {
+			LOG.info("Shutting down");
 
-		for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) {
-			try {
-				removeFromZooKeeperAndDiscardCheckpoint(checkpoint);
-			}
-			catch (Exception e) {
-				LOG.error("Failed to discard checkpoint.", e);
+			for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) {
+				try {
+					removeShutdown(checkpoint, jobStatus);
+				} catch (Exception e) {
+					LOG.error("Failed to discard checkpoint.", e);
+				}
 			}
+
+			checkpointStateHandles.clear();
+
+			String path = "/" + client.getNamespace();
+
+			LOG.info("Removing {} from ZooKeeper", path);
+			ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
+		} else {
+			LOG.info("Suspending");
+
+			// Clear the local handles, but don't remove any state
+			checkpointStateHandles.clear();
 		}
+	}
 
-		checkpointStateHandles.clear();
+	// ------------------------------------------------------------------------
 
-		String path = "/" + client.getNamespace();
+	private void removeSubsumed(final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception {
+		Callable<Void> action = new Callable<Void>() {
+			@Override
+			public Void call() throws Exception {
+				stateHandleAndPath.f0.retrieveState().subsume();
+				return null;
+			}
+		};
 
-		LOG.info("Removing {} from ZooKeeper", path);
-		ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
+		remove(stateHandleAndPath, action);
 	}
 
-	@Override
-	public void suspend() throws Exception {
-		LOG.info("Suspending");
+	private void removeShutdown(
+			final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath,
+			final JobStatus jobStatus) throws Exception {
 
-		// Clear the local handles, but don't remove any state
-		checkpointStateHandles.clear();
+		Callable<Void> action = new Callable<Void>() {
+			@Override
+			public Void call() throws Exception {
+				CompletedCheckpoint checkpoint = stateHandleAndPath.f0.retrieveState();
+				checkpoint.discard(jobStatus);
+				return null;
+			}
+		};
+
+		remove(stateHandleAndPath, action);
 	}
 
 	/**
 	 * Removes the state handle from ZooKeeper, discards the checkpoints, and the state handle.
 	 */
-	private void removeFromZooKeeperAndDiscardCheckpoint(
-			final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception {
+	private void remove(
+			final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath,
+			final Callable<Void> action) throws Exception {
 
-		final BackgroundCallback callback = new BackgroundCallback() {
+		BackgroundCallback callback = new BackgroundCallback() {
 			@Override
 			public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
 				try {
 					if (event.getType() == CuratorEventType.DELETE) {
 						if (event.getResultCode() == 0) {
-							// The checkpoint
 							try {
-								CompletedCheckpoint checkpoint = stateHandleAndPath.f0.retrieveState();
-								checkpoint.discardState();
-								// Discard the checkpoint
-								LOG.debug("Discarded " + checkpoint);
+								action.call();
 							} finally {
 								// Discard the state handle
 								stateHandleAndPath.f0.discardState();
 							}
-						}
-						else {
+						} else {
 							throw new IllegalStateException("Unexpected result code " +
 									event.getResultCode() + " in '" + event + "' callback.");
 						}
-					}
-					else {
+					} else {
 						throw new IllegalStateException("Unexpected event type " +
 								event.getType() + " in '" + event + "' callback.");
 					}
-				}
-				catch (Exception e) {
+				} catch (Exception e) {
 					LOG.error("Failed to discard checkpoint.", e);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java
deleted file mode 100644
index 49f51be..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A file system based {@link SavepointStore}.
- *
- * <p>Stored savepoints have the following format:
- * <pre>
- * MagicNumber SavepointVersion Savepoint
- *   - MagicNumber => int
- *   - SavepointVersion => int (returned by Savepoint#getVersion())
- *   - Savepoint => bytes (serialized via version-specific SavepointSerializer)
- * </pre>
- */
-public class FsSavepointStore implements SavepointStore {
-
-	private static final Logger LOG = LoggerFactory.getLogger(FsSavepointStore.class);
-
-	/** Magic number for sanity checks against stored savepoints. */
-	int MAGIC_NUMBER = 0x4960672d;
-
-	/** Root path for savepoints. */
-	private final Path rootPath;
-
-	/** Prefix for savepoint files. */
-	private final String prefix;
-
-	/** File system to use for file access. */
-	private final FileSystem fileSystem;
-
-	/**
-	 * Creates a new file system based {@link SavepointStore}.
-	 *
-	 * @param rootPath Root path for savepoints
-	 * @param prefix   Prefix for savepoint files
-	 * @throws IOException On failure to access root path
-	 */
-	FsSavepointStore(String rootPath, String prefix) throws IOException {
-		this.rootPath = new Path(checkNotNull(rootPath, "Root path"));
-		this.prefix = checkNotNull(prefix, "Prefix");
-
-		this.fileSystem = FileSystem.get(this.rootPath.toUri());
-	}
-
-	@Override
-	public <T extends Savepoint> String storeSavepoint(T savepoint) throws IOException {
-		Preconditions.checkNotNull(savepoint, "Savepoint");
-
-		Exception latestException = null;
-		Path path = null;
-		FSDataOutputStream fdos = null;
-
-		// Try to create a FS output stream
-		for (int attempt = 0; attempt < 10; attempt++) {
-			path = new Path(rootPath, FileUtils.getRandomFilename(prefix));
-			try {
-				fdos = fileSystem.create(path, false);
-				break;
-			} catch (Exception e) {
-				latestException = e;
-			}
-		}
-
-		if (fdos == null) {
-			throw new IOException("Failed to create file output stream at " + path, latestException);
-		}
-
-		boolean success = false;
-		try (DataOutputStream dos = new DataOutputStream(fdos)) {
-			// Write header
-			dos.writeInt(MAGIC_NUMBER);
-			dos.writeInt(savepoint.getVersion());
-
-			// Write savepoint
-			SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(savepoint);
-			serializer.serialize(savepoint, dos);
-			success = true;
-		} finally {
-			if (!success && fileSystem.exists(path)) {
-				if (!fileSystem.delete(path, true)) {
-					LOG.warn("Failed to delete file " + path + " after failed write.");
-				}
-			}
-		}
-
-		return path.toString();
-	}
-
-	@Override
-	public Savepoint loadSavepoint(String path) throws IOException {
-		Preconditions.checkNotNull(path, "Path");
-
-		try (DataInputStream dis = new DataInputViewStreamWrapper(createFsInputStream(new Path(path)))) {
-			int magicNumber = dis.readInt();
-
-			if (magicNumber == MAGIC_NUMBER) {
-				int version = dis.readInt();
-
-				SavepointSerializer<?> serializer = SavepointSerializers.getSerializer(version);
-				return serializer.deserialize(dis);
-			} else {
-				throw new RuntimeException("Unexpected magic number. This is most likely " +
-						"caused by trying to load a Flink 1.0 savepoint. You cannot load a " +
-						"savepoint triggered by Flink 1.0 with this version of Flink. If it is " +
-						"_not_ a Flink 1.0 savepoint, this error indicates that the specified " +
-						"file is not a proper savepoint or the file has been corrupted.");
-			}
-		}
-	}
-
-	@Override
-	public void disposeSavepoint(String path) throws Exception {
-		Preconditions.checkNotNull(path, "Path");
-
-		try {
-			Savepoint savepoint = loadSavepoint(path);
-			LOG.info("Disposing savepoint: " + path);
-			savepoint.dispose();
-
-			Path filePath = new Path(path);
-
-			if (fileSystem.exists(filePath)) {
-				if (!fileSystem.delete(filePath, true)) {
-					throw new IOException("Failed to delete " + filePath + ".");
-				}
-			} else {
-				throw new IllegalArgumentException("Invalid path '" + filePath.toUri() + "'.");
-			}
-		} catch (Throwable t) {
-			throw new IOException("Failed to dispose savepoint " + path + ".", t);
-		}
-	}
-
-	@Override
-	public void shutdown() throws Exception {
-		// Nothing to do, because the savepoint life-cycle is independent of
-		// the cluster life-cycle.
-	}
-
-	private FSDataInputStream createFsInputStream(Path path) throws IOException {
-		if (fileSystem.exists(path)) {
-			return fileSystem.open(path);
-		} else {
-			throw new IllegalArgumentException("Invalid path '" + path.toUri() + "'.");
-		}
-	}
-
-	/**
-	 * Returns the savepoint root path.
-	 *
-	 * @return Savepoint root path
-	 */
-	Path getRootPath() {
-		return rootPath;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java
deleted file mode 100644
index 2cf8f31..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Heap-backed savepoint store.
- *
- * <p>The life-cycle of savepoints is bound to the life-cycle of the cluster.
- */
-public class HeapSavepointStore implements SavepointStore {
-
-	private static final Logger LOG = LoggerFactory.getLogger(HeapSavepointStore.class);
-
-	private final Object shutDownLock = new Object();
-
-	/** Stored savepoints. */
-	private final Map<String, Savepoint> savepoints = new HashMap<>(1);
-
-	/** ID counter to identify savepoints. */
-	private final AtomicInteger currentId = new AtomicInteger();
-
-	/** Flag indicating whether state store has been shut down. */
-	private boolean shutDown;
-
-	/** Shut down hook. */
-	private final Thread shutdownHook;
-
-	/**
-	 * Creates a heap-backed savepoint store.
-	 *
-	 * <p>Savepoint are discarded on {@link #shutdown()}.
-	 */
-	public HeapSavepointStore() {
-		this.shutdownHook = new Thread(new Runnable() {
-			@Override
-			public void run() {
-				try {
-					shutdown();
-				} catch (Throwable t) {
-					LOG.warn("Failure during shut down hook.", t);
-				}
-			}
-		});
-
-		try {
-			Runtime.getRuntime().addShutdownHook(shutdownHook);
-		} catch (IllegalStateException ignored) {
-			// JVM is already shutting down
-		} catch (Throwable t) {
-			LOG.warn("Failed to register shutdown hook.");
-		}
-	}
-
-	@Override
-	public <T extends Savepoint> String storeSavepoint(T savepoint) throws IOException {
-		Preconditions.checkNotNull(savepoint, "Savepoint");
-
-		synchronized (shutDownLock) {
-			if (shutDown) {
-				throw new IllegalStateException("Shut down");
-			} else {
-				String path = "jobmanager://savepoints/" + currentId.incrementAndGet();
-				savepoints.put(path, savepoint);
-				return path;
-			}
-		}
-	}
-
-	@Override
-	public Savepoint loadSavepoint(String path) throws IOException {
-		Preconditions.checkNotNull(path, "Path");
-
-		Savepoint savepoint;
-		synchronized (shutDownLock) {
-			savepoint = savepoints.get(path);
-		}
-
-		if (savepoint != null) {
-			return savepoint;
-		} else {
-			throw new IllegalArgumentException("Invalid path '" + path + "'.");
-		}
-	}
-
-	@Override
-	public void disposeSavepoint(String path) throws Exception {
-		Preconditions.checkNotNull(path, "Path");
-
-		Savepoint savepoint;
-		synchronized (shutDownLock) {
-			savepoint = savepoints.remove(path);
-		}
-
-		if (savepoint != null) {
-			savepoint.dispose();
-		} else {
-			throw new IllegalArgumentException("Invalid path '" + path + "'.");
-		}
-	}
-
-	@Override
-	public void shutdown() throws Exception {
-		synchronized (shutDownLock) {
-			// This is problematic as the user code class loader is not
-			// available at this point.
-			for (Savepoint savepoint : savepoints.values()) {
-				try {
-					savepoint.dispose();
-				} catch (Throwable t) {
-					LOG.warn("Failed to dispose savepoint " + savepoint.getCheckpointId(), t);
-				}
-			}
-
-			savepoints.clear();
-
-			// Remove shutdown hook to prevent resource leaks, unless this is
-			// invoked by the shutdown hook itself.
-			if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
-				try {
-					Runtime.getRuntime().removeShutdownHook(shutdownHook);
-				} catch (IllegalStateException ignored) {
-					// Race, JVM is in shutdown already, we can safely ignore this
-				} catch (Throwable t) {
-					LOG.warn("Failed to unregister shut down hook.");
-				}
-			}
-
-			shutDown = true;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 47917b4..845008d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -19,16 +19,18 @@
 package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
 /**
- * The SavepointLoader is a utility to load and verify a Savepoint, and to create a checkpoint from it. 
+ * The SavepointLoader is a utility to load and verify a Savepoint, and to create a checkpoint from it.
  */
 public class SavepointLoader {
 
@@ -39,7 +41,6 @@ public class SavepointLoader {
 	 *
 	 * @param jobId          The JobID of the job to load the savepoint for.
 	 * @param tasks          Tasks that will possibly be reset
-	 * @param savepointStore The store that holds the savepoint.
 	 * @param savepointPath  The path of the savepoint to rollback to
 	 *
 	 * @throws IllegalStateException If mismatch between program and savepoint state
@@ -48,13 +49,12 @@ public class SavepointLoader {
 	public static CompletedCheckpoint loadAndValidateSavepoint(
 			JobID jobId,
 			Map<JobVertexID, ExecutionJobVertex> tasks,
-			SavepointStore savepointStore,
-			String savepointPath) throws Exception {
+			String savepointPath) throws IOException {
 
 		// (1) load the savepoint
-		Savepoint savepoint = savepointStore.loadSavepoint(savepointPath);
+		Savepoint savepoint = SavepointStore.loadSavepoint(savepointPath);
 		final Map<JobVertexID, TaskState> taskStates = new HashMap<>(savepoint.getTaskStates().size());
-		
+
 		// (2) validate it (parallelism, etc)
 		for (TaskState taskState : savepoint.getTaskStates()) {
 			ExecutionJobVertex executionJobVertex = tasks.get(taskState.getJobVertexID());
@@ -66,12 +66,12 @@ public class SavepointLoader {
 				else {
 					String msg = String.format("Failed to rollback to savepoint %s. " +
 									"Max parallelism mismatch between savepoint state and new program. " +
-									"Cannot map operator %s with parallelism %d to new program with " +
+									"Cannot map operator %s with max parallelism %d to new program with " +
 									"parallelism %d. This indicates that the program has been changed " +
 									"in a non-compatible way after the savepoint.",
 							savepoint,
 							taskState.getJobVertexID(),
-							taskState.getParallelism(),
+							taskState.getMaxParallelism(),
 							executionJobVertex.getParallelism());
 
 					throw new IllegalStateException(msg);
@@ -87,8 +87,9 @@ public class SavepointLoader {
 		}
 
 		// (3) convert to checkpoint so the system can fall back to it
-		return new CompletedCheckpoint(jobId, savepoint.getCheckpointId(), 0L, 0L, taskStates, false);
-	} 
+		CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
+		return new CompletedCheckpoint(jobId, savepoint.getCheckpointId(), 0L, 0L, taskStates, props, savepointPath);
+	}
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
index 211209c..6a55b33 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
@@ -25,7 +25,7 @@ import java.io.IOException;
 /**
  * Serializer for {@link Savepoint} instances.
  *
- * <p>This serializer is used to read/write a savepoint via {@link FsSavepointStore}.
+ * <p>This serializer is used to read/write a savepoint via {@link SavepointStore}.
  *
  * <p>Version-specific serializers are accessed via the {@link SavepointSerializers} helper.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 68b88d2..4b65418 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -18,23 +18,105 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * Savepoint store used to persist {@link Savepoint} instances.
+ * A file system based savepoint store.
  *
- * <p>The main implementation is the {@link FsSavepointStore}. We also have the
- * {@link HeapSavepointStore} for historical reasons (introduced in Flink 1.0).
+ * <p>Stored savepoints have the following format:
+ * <pre>
+ * MagicNumber SavepointVersion Savepoint
+ *   - MagicNumber => int
+ *   - SavepointVersion => int (returned by Savepoint#getVersion())
+ *   - Savepoint => bytes (serialized via version-specific SavepointSerializer)
+ * </pre>
  */
-public interface SavepointStore {
+public class SavepointStore {
+
+	private static final Logger LOG = LoggerFactory.getLogger(SavepointStore.class);
+
+	/** Magic number for sanity checks against stored savepoints. */
+	private static final int MAGIC_NUMBER = 0x4960672d;
+
+	/** Prefix for savepoint files. */
+	private static final String prefix = "savepoint-";
 
 	/**
 	 * Stores the savepoint.
 	 *
+	 * @param targetDirectory Target directory to store savepoint in
 	 * @param savepoint Savepoint to be stored
 	 * @param <T>       Savepoint type
 	 * @return Path of stored savepoint
 	 * @throws Exception Failures during store are forwarded
 	 */
-	<T extends Savepoint> String storeSavepoint(T savepoint) throws Exception;
+	public static <T extends Savepoint> String storeSavepoint(
+			String targetDirectory,
+			T savepoint) throws IOException {
+
+		checkNotNull(targetDirectory, "Target directory");
+		checkNotNull(savepoint, "Savepoint");
+
+		Exception latestException = null;
+		Path path = null;
+		FSDataOutputStream fdos = null;
+
+		FileSystem fs = null;
+
+		// Try to create a FS output stream
+		for (int attempt = 0; attempt < 10; attempt++) {
+			path = new Path(targetDirectory, FileUtils.getRandomFilename(prefix));
+
+			if (fs == null) {
+				fs = FileSystem.get(path.toUri());
+			}
+
+			try {
+				fdos = fs.create(path, false);
+				break;
+			} catch (Exception e) {
+				latestException = e;
+			}
+		}
+
+		if (fdos == null) {
+			throw new IOException("Failed to create file output stream at " + path, latestException);
+		}
+
+		boolean success = false;
+		try (DataOutputStream dos = new DataOutputStream(fdos)) {
+			// Write header
+			dos.writeInt(MAGIC_NUMBER);
+			dos.writeInt(savepoint.getVersion());
+
+			// Write savepoint
+			SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(savepoint);
+			serializer.serialize(savepoint, dos);
+			success = true;
+		} finally {
+			if (!success && fs.exists(path)) {
+				if (!fs.delete(path, true)) {
+					LOG.warn("Failed to delete file {} after failed write.", path);
+				}
+			}
+		}
+
+		return path.toString();
+	}
 
 	/**
 	 * Loads the savepoint at the specified path.
@@ -43,24 +125,62 @@ public interface SavepointStore {
 	 * @return The loaded savepoint
 	 * @throws Exception Failures during load are forwared
 	 */
-	Savepoint loadSavepoint(String path) throws Exception;
+	public static Savepoint loadSavepoint(String path) throws IOException {
+		Preconditions.checkNotNull(path, "Path");
 
-	/**
-	 * Disposes the savepoint at the specified path.
-	 *
-	 * @param path        Path of savepoint to dispose
-	 * @throws Exception Failures during diposal are forwarded
-	 */
-	void disposeSavepoint(String path) throws Exception;
+		try (DataInputStream dis = new DataInputViewStreamWrapper(createFsInputStream(new Path(path)))) {
+			int magicNumber = dis.readInt();
+
+			if (magicNumber == MAGIC_NUMBER) {
+				int version = dis.readInt();
+
+				SavepointSerializer<?> serializer = SavepointSerializers.getSerializer(version);
+				return serializer.deserialize(dis);
+			} else {
+				throw new RuntimeException("Unexpected magic number. This is most likely " +
+						"caused by trying to load a Flink 1.0 savepoint. You cannot load a " +
+						"savepoint triggered by Flink 1.0 with this version of Flink. If it is " +
+						"_not_ a Flink 1.0 savepoint, this error indicates that the specified " +
+						"file is not a proper savepoint or the file has been corrupted.");
+			}
+		}
+	}
 
 	/**
-	 * Shut downs the savepoint store.
+	 * Removes the savepoint meta data w/o loading and disposing it.
 	 *
-	 * <p>Only necessary for implementations where the savepoint life-cycle is
-	 * bound to the cluster life-cycle.
-	 *
-	 * @throws Exception Failures during shut down are forwarded
+	 * @param path Path of savepoint to remove
+	 * @throws Exception Failures during disposal are forwarded
 	 */
-	void shutdown() throws Exception;
+	public static void removeSavepoint(String path) throws IOException {
+		Preconditions.checkNotNull(path, "Path");
+
+		try {
+			LOG.info("Removing savepoint: {}.", path);
+
+			Path filePath = new Path(path);
+			FileSystem fs = FileSystem.get(filePath.toUri());
+
+			if (fs.exists(filePath)) {
+				if (!fs.delete(filePath, true)) {
+					throw new IOException("Failed to delete " + filePath + ".");
+				}
+			} else {
+				throw new IllegalArgumentException("Invalid path '" + filePath.toUri() + "'.");
+			}
+		} catch (Throwable t) {
+			throw new IOException("Failed to dispose savepoint " + path + ".", t);
+		}
+	}
+
+	private static FSDataInputStream createFsInputStream(Path path) throws IOException {
+		FileSystem fs = FileSystem.get(path.toUri());
+
+		if (fs.exists(path)) {
+			return fs.open(path);
+		} else {
+			throw new IllegalArgumentException("Invalid path '" + path.toUri() + "'.");
+		}
+	}
 
 }