You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/10/20 09:58:57 UTC

[06/47] flink git commit: [FLINK-2354] [runtime] Add job graph and checkpoint recovery

[FLINK-2354] [runtime] Add job graph and checkpoint recovery

This closes #1153.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73c73e92
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73c73e92
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73c73e92

Branch: refs/heads/master
Commit: 73c73e92750ab8fb068d0a3cb37afcb642084fc0
Parents: 3aaee1e
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Sep 1 17:25:46 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Oct 20 00:16:51 2015 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  21 +
 .../checkpoint/CheckpointCoordinator.java       | 223 +++---
 .../runtime/checkpoint/CheckpointIDCounter.java |  43 ++
 .../checkpoint/CheckpointRecoveryFactory.java   |  61 ++
 .../runtime/checkpoint/CompletedCheckpoint.java |  81 +++
 .../checkpoint/CompletedCheckpointStore.java    |  69 ++
 .../runtime/checkpoint/PendingCheckpoint.java   |   6 +-
 .../StandaloneCheckpointIDCounter.java          |  47 ++
 .../StandaloneCheckpointRecoveryFactory.java    |  52 ++
 .../StandaloneCompletedCheckpointStore.java     | 100 +++
 .../flink/runtime/checkpoint/StateForTask.java  |  21 +-
 .../checkpoint/SuccessfulCheckpoint.java        |  82 ---
 .../ZooKeeperCheckpointIDCounter.java           | 130 ++++
 .../ZooKeeperCheckpointRecoveryFactory.java     |  66 ++
 .../ZooKeeperCompletedCheckpointStore.java      | 293 ++++++++
 .../runtime/executiongraph/ExecutionGraph.java  |  51 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |   5 +
 .../flink/runtime/jobmanager/RecoveryMode.java  |  13 +
 .../StandaloneSubmittedJobGraphStore.java       |  65 ++
 .../runtime/jobmanager/SubmittedJobGraph.java   |  77 ++
 .../jobmanager/SubmittedJobGraphStore.java      |  93 +++
 .../ZooKeeperSubmittedJobGraphStore.java        | 379 ++++++++++
 .../leaderelection/LeaderElectionService.java   |   1 +
 .../ZooKeeperLeaderElectionService.java         |   1 +
 .../flink/runtime/state/StateBackend.java       |  39 +
 .../state/StateHandleProviderFactory.java       |  61 ++
 .../flink/runtime/util/LeaderElectionUtils.java |  67 --
 .../flink/runtime/util/ZooKeeperUtils.java      | 138 +++-
 .../zookeeper/ZooKeeperStateHandleStore.java    | 384 ++++++++++
 .../flink/runtime/jobmanager/JobInfo.scala      |  25 +-
 .../flink/runtime/jobmanager/JobManager.scala   | 594 ++++++++++-----
 .../runtime/messages/JobManagerMessages.scala   |  16 +
 .../runtime/minicluster/FlinkMiniCluster.scala  |   4 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 190 ++---
 .../checkpoint/CheckpointIDCounterTest.java     | 194 +++++
 .../checkpoint/CheckpointStateRestoreTest.java  |  19 +-
 .../CompletedCheckpointStoreTest.java           | 297 ++++++++
 .../StandaloneCompletedCheckpointStoreTest.java |  33 +
 ...ZooKeeperCompletedCheckpointStoreITCase.java | 101 +++
 .../BlobLibraryCacheManagerTest.java            |   4 +
 .../PartitionRequestClientFactoryTest.java      |   2 +-
 ...ManagerSubmittedJobGraphsRecoveryITCase.java | 460 ++++++++++++
 .../StandaloneSubmittedJobGraphStoreTest.java   |  53 ++
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java | 283 ++++++++
 .../JobManagerLeaderElectionTest.java           |  27 +-
 .../TestingLeaderElectionService.java           |   4 +
 .../ZooKeeperLeaderRetrievalTest.java           |  21 +-
 .../messages/CheckpointMessagesTest.java        |   2 +-
 .../runtime/taskmanager/TaskCancelTest.java     |  67 +-
 .../runtime/testutils/CommonTestUtils.java      |  75 +-
 .../testutils/JobManagerActorTestUtils.java     | 166 +++++
 .../runtime/testutils/JobManagerProcess.java    | 226 ++++++
 .../runtime/testutils/TaskManagerProcess.java   | 133 ++++
 .../flink/runtime/testutils/TestJvmProcess.java | 267 +++++++
 .../runtime/testutils/ZooKeeperTestUtils.java   |  94 +++
 .../ZooKeeperStateHandleStoreITCase.java        | 591 +++++++++++++++
 .../zookeeper/ZooKeeperTestEnvironment.java     | 133 ++++
 .../ExecutionGraphRestartTest.scala             |  21 +-
 .../runtime/testingUtils/TestingCluster.scala   |  44 +-
 .../testingUtils/TestingJobManager.scala        |  32 +-
 .../streaming/runtime/tasks/StreamTask.java     |   2 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |  36 +-
 .../checkpointing/StateCheckpoinedITCase.java   | 391 ----------
 .../checkpointing/StateCheckpointedITCase.java  | 391 ++++++++++
 ...tJobManagerProcessFailureRecoveryITCase.java | 289 ++++++++
 .../AbstractProcessFailureRecoveryTest.java     | 444 ------------
 ...ctTaskManagerProcessFailureRecoveryTest.java | 397 +++++++++++
 .../flink/test/recovery/ChaosMonkeyITCase.java  | 713 +++++++++++++++++++
 .../JobManagerCheckpointRecoveryITCase.java     | 395 ++++++++++
 ...anagerProcessFailureBatchRecoveryITCase.java | 140 ++++
 .../ProcessFailureBatchRecoveryITCase.java      | 115 ---
 .../recovery/ProcessFailureCancelingITCase.java |   4 +-
 .../ProcessFailureStreamingRecoveryITCase.java  | 234 ------
 ...anagerProcessFailureBatchRecoveryITCase.java | 115 +++
 ...erProcessFailureStreamingRecoveryITCase.java | 234 ++++++
 .../ZooKeeperLeaderElectionITCase.java          |  61 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  |  11 +-
 77 files changed, 8901 insertions(+), 1918 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 5d6f1c7..be730a0 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -407,6 +407,12 @@ public final class ConfigConstants {
 	 */
 	public static final String STATE_BACKEND = "state.backend";
 	
+	/**
+	 * File system state backend base path for recoverable state handles. Recovery state is written
+	 * to this path and the file state handles are persisted for recovery.
+	 */
+	public static final String STATE_BACKEND_FS_RECOVERY_PATH = "state.backend.fs.dir.recovery";
+	
 	// ----------------------------- Miscellaneous ----------------------------
 	
 	/**
@@ -433,6 +439,15 @@ public final class ConfigConstants {
 
 	public static final String ZOOKEEPER_LEADER_PATH = "ha.zookeeper.dir.leader";
 
+	/** ZooKeeper root path (ZNode) for job graphs. */
+	public static final String ZOOKEEPER_JOBGRAPHS_PATH = "ha.zookeeper.dir.jobgraphs";
+
+	/** ZooKeeper root path (ZNode) for completed checkpoints. */
+	public static final String ZOOKEEPER_CHECKPOINTS_PATH = "ha.zookeeper.dir.checkpoints";
+
+	/** ZooKeeper root path (ZNode) for checkpoint counters. */
+	public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "ha.zookeeper.dir.checkpoint-counter";
+
 	public static final String ZOOKEEPER_SESSION_TIMEOUT = "ha.zookeeper.client.session-timeout";
 
 	public static final String ZOOKEEPER_CONNECTION_TIMEOUT = "ha.zookeeper.client.connection-timeout";
@@ -699,6 +714,12 @@ public final class ConfigConstants {
 
 	public static final String DEFAULT_ZOOKEEPER_LEADER_PATH = "/leader";
 
+	public static final String DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH = "/jobgraphs";
+
+	public static final String DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH = "/checkpoints";
+
+	public static final String DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "/checkpoint-counter";
+
 	public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = 60000;
 
 	public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT = 15000;

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 8f0b19b..fdb59d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,16 +21,16 @@ package org.apache.flink.runtime.checkpoint;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
@@ -38,7 +38,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -48,13 +47,19 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * The checkpoint coordinator coordinates the distributed snapshots of operators and state.
  * It triggers the checkpoint by sending the messages to the relevant tasks and collects the
  * checkpoint acknowledgements. It also collects and maintains the overview of the state handles
  * reported by the tasks that acknowledge the checkpoint.
+ *
+ * <p>Depending on the configured {@link RecoveryMode}, the behaviour of the {@link
+ * CompletedCheckpointStore} and {@link CheckpointIDCounter} change. The default standalone
+ * implementations don't support any recovery.
  */
 public class CheckpointCoordinator {
 	
@@ -79,12 +84,20 @@ public class CheckpointCoordinator {
 	private final ExecutionVertex[] tasksToCommitTo;
 
 	private final Map<Long, PendingCheckpoint> pendingCheckpoints;
-	
-	private final ArrayDeque<SuccessfulCheckpoint> completedCheckpoints;
+
+	/**
+	 * Completed checkpoints. Implementations can be blocking. Make sure calls to methods
+	 * accessing this don't block the job manager actor and run asynchronously.
+	 */
+	private final CompletedCheckpointStore completedCheckpointStore;
 	
 	private final ArrayDeque<Long> recentPendingCheckpoints;
 
-	private final AtomicLong checkpointIdCounter = new AtomicLong(1);
+	/**
+	 * Checkpoint ID counter to ensure ascending IDs. In case of job manager failures, these
+	 * need to be ascending across job managers.
+	 */
+	private final CheckpointIDCounter checkpointIdCounter;
 
 	private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger();
 
@@ -93,8 +106,6 @@ public class CheckpointCoordinator {
 	
 	private final long checkpointTimeout;
 	
-	private final int numSuccessfulCheckpointsToRetain;
-	
 	private TimerTask periodicScheduler;
 	
 	private ActorGateway jobStatusListener;
@@ -110,61 +121,62 @@ public class CheckpointCoordinator {
 
 	public CheckpointCoordinator(
 			JobID job,
-			int numSuccessfulCheckpointsToRetain,
 			long checkpointTimeout,
 			ExecutionVertex[] tasksToTrigger,
 			ExecutionVertex[] tasksToWaitFor,
 			ExecutionVertex[] tasksToCommitTo,
-			ClassLoader userClassLoader) {
+			ClassLoader userClassLoader,
+			CheckpointIDCounter checkpointIDCounter,
+			CompletedCheckpointStore completedCheckpointStore,
+			RecoveryMode recoveryMode) throws Exception {
 		
-		// some sanity checks
-		if (job == null || tasksToTrigger == null ||
-				tasksToWaitFor == null || tasksToCommitTo == null) {
-			throw new NullPointerException();
-		}
-		if (numSuccessfulCheckpointsToRetain < 1) {
-			throw new IllegalArgumentException("Must retain at least one successful checkpoint");
-		}
-		if (checkpointTimeout < 1) {
-			throw new IllegalArgumentException("Checkpoint timeout must be larger than zero");
-		}
+		// Sanity check
+		checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero");
 		
-		this.job = job;
-		this.numSuccessfulCheckpointsToRetain = numSuccessfulCheckpointsToRetain;
+		this.job = checkNotNull(job);
 		this.checkpointTimeout = checkpointTimeout;
-		this.tasksToTrigger = tasksToTrigger;
-		this.tasksToWaitFor = tasksToWaitFor;
-		this.tasksToCommitTo = tasksToCommitTo;
+		this.tasksToTrigger = checkNotNull(tasksToTrigger);
+		this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
+		this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
 		this.pendingCheckpoints = new LinkedHashMap<Long, PendingCheckpoint>();
-		this.completedCheckpoints = new ArrayDeque<SuccessfulCheckpoint>(numSuccessfulCheckpointsToRetain + 1);
+		this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
 		this.recentPendingCheckpoints = new ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS);
 		this.userClassLoader = userClassLoader;
+		this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
+		checkpointIDCounter.start();
 
-		timer = new Timer("Checkpoint Timer", true);
+		this.timer = new Timer("Checkpoint Timer", true);
 
-		// Add shutdown hook to clean up state handles
-		shutdownHook = new Thread(new Runnable() {
-			@Override
-			public void run() {
-				try {
-					CheckpointCoordinator.this.shutdown();
-				}
-				catch (Throwable t) {
-					LOG.error("Error during shutdown of blob service via JVM shutdown hook: " +
-							t.getMessage(), t);
+		if (recoveryMode == RecoveryMode.STANDALONE) {
+			// Add shutdown hook to clean up state handles when no checkpoint recovery is
+			// possible. In case of another configured recovery mode, the checkpoints need to be
+			// available for the standby job managers.
+			this.shutdownHook = new Thread(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						CheckpointCoordinator.this.shutdown();
+					}
+					catch (Throwable t) {
+						LOG.error("Error during shutdown of checkpoint coordniator via " +
+								"JVM shutdown hook: " + t.getMessage(), t);
+					}
 				}
-			}
-		});
+			});
 
-		try {
-			// Add JVM shutdown hook to call shutdown of service
-			Runtime.getRuntime().addShutdownHook(shutdownHook);
-		}
-		catch (IllegalStateException ignored) {
-			// JVM is already shutting down. No need to do anything.
+			try {
+				// Add JVM shutdown hook to call shutdown of service
+				Runtime.getRuntime().addShutdownHook(shutdownHook);
+			}
+			catch (IllegalStateException ignored) {
+				// JVM is already shutting down. No need to do anything.
+			}
+			catch (Throwable t) {
+				LOG.error("Cannot register checkpoint coordinator shutdown hook.", t);
+			}
 		}
-		catch (Throwable t) {
-			LOG.error("Cannot register checkpoint coordinator shutdown hook.", t);
+		else {
+			this.shutdownHook = null;
 		}
 	}
 
@@ -178,41 +190,39 @@ public class CheckpointCoordinator {
 	 * After this method has been called, the coordinator does not accept and further
 	 * messages and cannot trigger any further checkpoints.
 	 */
-	public void shutdown() {
+	public void shutdown() throws Exception {
 		synchronized (lock) {
-			try {	
-				if (shutdown) {
-					return;
-				}
-				shutdown = true;
-				LOG.info("Stopping checkpoint coordinator for job " + job);
-			
-				// shut down the thread that handles the timeouts
-				timer.cancel();
-			
-				// make sure that the actor does not linger
-				if (jobStatusListener != null) {
-					jobStatusListener.tell(PoisonPill.getInstance());
-					jobStatusListener = null;
-				}
-			
-				// the scheduling thread needs also to go away
-				if (periodicScheduler != null) {
-					periodicScheduler.cancel();
-					periodicScheduler = null;
-				}
-			
-				// clear and discard all pending checkpoints
-				for (PendingCheckpoint pending : pendingCheckpoints.values()) {
-						pending.discard(userClassLoader, true);
-				}
-				pendingCheckpoints.clear();
-			
-				// clean and discard all successful checkpoints
-				for (SuccessfulCheckpoint checkpoint : completedCheckpoints) {
-					checkpoint.discard(userClassLoader);
+			try {
+				if (!shutdown) {
+					shutdown = true;
+					LOG.info("Stopping checkpoint coordinator for job " + job);
+
+					// shut down the thread that handles the timeouts
+					timer.cancel();
+
+					// make sure that the actor does not linger
+					if (jobStatusListener != null) {
+						jobStatusListener.tell(PoisonPill.getInstance());
+						jobStatusListener = null;
+					}
+
+					// the scheduling thread needs also to go away
+					if (periodicScheduler != null) {
+						periodicScheduler.cancel();
+						periodicScheduler = null;
+					}
+
+					checkpointIdCounter.stop();
+
+					// clear and discard all pending checkpoints
+					for (PendingCheckpoint pending : pendingCheckpoints.values()) {
+							pending.discard(userClassLoader, true);
+					}
+					pendingCheckpoints.clear();
+
+					// clean and discard all successful checkpoints
+					completedCheckpointStore.discardAllCheckpoints();
 				}
-				completedCheckpoints.clear();
 			}
 			finally {
 				// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
@@ -244,7 +254,7 @@ public class CheckpointCoordinator {
 	 * Triggers a new checkpoint and uses the current system time as the
 	 * checkpoint time.
 	 */
-	public void triggerCheckpoint() {
+	public void triggerCheckpoint() throws Exception {
 		triggerCheckpoint(System.currentTimeMillis());
 	}
 
@@ -254,7 +264,7 @@ public class CheckpointCoordinator {
 	 * 
 	 * @param timestamp The timestamp for the checkpoint.
 	 */
-	public boolean triggerCheckpoint(final long timestamp) {
+	public boolean triggerCheckpoint(final long timestamp) throws Exception {
 		if (shutdown) {
 			LOG.error("Cannot trigger checkpoint, checkpoint coordinator has been shutdown.");
 			return false;
@@ -354,7 +364,7 @@ public class CheckpointCoordinator {
 		}
 	}
 	
-	public void receiveAcknowledgeMessage(AcknowledgeCheckpoint message) {
+	public void receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws Exception {
 		if (shutdown || message == null) {
 			return;
 		}
@@ -365,7 +375,7 @@ public class CheckpointCoordinator {
 		
 		final long checkpointId = message.getCheckpointId();
 
-		SuccessfulCheckpoint completed = null;
+		CompletedCheckpoint completed = null;
 		PendingCheckpoint checkpoint;
 		synchronized (lock) {
 			// we need to check inside the lock for being shutdown as well, otherwise we
@@ -380,13 +390,13 @@ public class CheckpointCoordinator {
 				if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState())) {
 					
 					if (checkpoint.isFullyAcknowledged()) {
+						completed = checkpoint.toCompletedCheckpoint();
+
+						completedCheckpointStore.addCheckpoint(completed);
+
 						LOG.info("Completed checkpoint " + checkpointId);
+						LOG.debug(completed.getStates().toString());
 
-						completed = checkpoint.toCompletedCheckpoint();
-						completedCheckpoints.addLast(completed);
-						if (completedCheckpoints.size() > numSuccessfulCheckpointsToRetain) {
-							completedCheckpoints.removeFirst().discard(userClassLoader);
-						}
 						pendingCheckpoints.remove(checkpointId);
 						rememberRecentCheckpointId(checkpointId);
 						
@@ -456,25 +466,30 @@ public class CheckpointCoordinator {
 	//  Checkpoint State Restoring
 	// --------------------------------------------------------------------------------------------
 
-	public void restoreLatestCheckpointedState(Map<JobVertexID, ExecutionJobVertex> tasks,
-												boolean errorIfNoCheckpoint,
-												boolean allOrNothingState) throws Exception {
+	public void restoreLatestCheckpointedState(
+			Map<JobVertexID, ExecutionJobVertex> tasks,
+			boolean errorIfNoCheckpoint,
+			boolean allOrNothingState) throws Exception {
+
 		synchronized (lock) {
 			if (shutdown) {
 				throw new IllegalStateException("CheckpointCoordinator is shut down");
 			}
-			
-			if (completedCheckpoints.isEmpty()) {
+
+			// Recover the checkpoints
+			completedCheckpointStore.recover();
+
+			// restore from the latest checkpoint
+			CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
+
+			if (latest == null) {
 				if (errorIfNoCheckpoint) {
 					throw new IllegalStateException("No completed checkpoint available");
 				} else {
 					return;
 				}
 			}
-			
-			// restore from the latest checkpoint
-			SuccessfulCheckpoint latest = completedCheckpoints.getLast();
-						
+
 			if (allOrNothingState) {
 				Map<ExecutionJobVertex, Integer> stateCounts = new HashMap<ExecutionJobVertex, Integer>();
 
@@ -519,7 +534,9 @@ public class CheckpointCoordinator {
 	}
 
 	public int getNumberOfRetainedSuccessfulCheckpoints() {
-		return this.completedCheckpoints.size();
+		synchronized (lock) {
+			return completedCheckpointStore.getNumberOfRetainedCheckpoints();
+		}
 	}
 
 	public Map<Long, PendingCheckpoint> getPendingCheckpoints() {
@@ -528,9 +545,9 @@ public class CheckpointCoordinator {
 		}
 	}
 	
-	public List<SuccessfulCheckpoint> getSuccessfulCheckpoints() {
+	public List<CompletedCheckpoint> getSuccessfulCheckpoints() throws Exception {
 		synchronized (lock) {
-			return new ArrayList<SuccessfulCheckpoint>(this.completedCheckpoints);
+			return completedCheckpointStore.getAllCheckpoints();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
new file mode 100644
index 0000000..34b7946
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * A checkpoint ID counter.
+ */
+public interface CheckpointIDCounter {
+
+	/**
+	 * Starts the {@link CheckpointIDCounter} service.
+	 */
+	void start() throws Exception;
+
+	/**
+	 * Stops the {@link CheckpointIDCounter} service.
+	 */
+	void stop() throws Exception;
+
+	/**
+	 * Atomically increments the current checkpoint ID.
+	 *
+	 * @return The previous checkpoint ID
+	 */
+	long getAndIncrement() throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
new file mode 100644
index 0000000..aa6e94b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+
+/**
+ * A factory for per Job checkpoint recovery components.
+ */
+public interface CheckpointRecoveryFactory {
+
+	/**
+	 * The number of {@link CompletedCheckpoint} instances to retain.
+	 */
+	int NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN = 1;
+
+	/**
+	 * Starts the {@link CheckpointRecoveryFactory} service.
+	 */
+	void start();
+
+	/**
+	 * Stops the {@link CheckpointRecoveryFactory} service.
+	 */
+	void stop();
+
+	/**
+	 * Creates a {@link CompletedCheckpointStore} instance for a job.
+	 *
+	 * @param jobId           Job ID to recover checkpoints for
+	 * @param userClassLoader User code class loader of the job
+	 * @return {@link CompletedCheckpointStore} instance for the job
+	 */
+	CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)
+			throws Exception;
+
+	/**
+	 * Creates a {@link CheckpointIDCounter} instance for a job.
+	 *
+	 * @param jobId Job ID to recover checkpoints for
+	 * @return {@link CheckpointIDCounter} instance for the job
+	 */
+	CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
new file mode 100644
index 0000000..ea3c26d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state)
+ * and that is considered completed.
+ */
+public class CompletedCheckpoint implements Serializable {
+
+	private static final long serialVersionUID = -8360248179615702014L;
+
+	private final JobID job;
+	
+	private final long checkpointID;
+	
+	private final long timestamp;
+	
+	private final ArrayList<StateForTask> states;
+
+	public CompletedCheckpoint(JobID job, long checkpointID, long timestamp, ArrayList<StateForTask> states) {
+		this.job = job;
+		this.checkpointID = checkpointID;
+		this.timestamp = timestamp;
+		this.states = states;
+	}
+
+	public JobID getJobId() {
+		return job;
+	}
+
+	public long getCheckpointID() {
+		return checkpointID;
+	}
+
+	public long getTimestamp() {
+		return timestamp;
+	}
+
+	public List<StateForTask> getStates() {
+		return states;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public void discard(ClassLoader userClassLoader) {
+		for(StateForTask state: states){
+			state.discard(userClassLoader);
+		}
+		states.clear();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public String toString() {
+		return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
new file mode 100644
index 0000000..d024aea
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import java.util.List;
+
+/**
+ * A bounded LIFO-queue of {@link CompletedCheckpoint} instances.
+ */
+public interface CompletedCheckpointStore {
+
+	/**
+	 * Recover available {@link CompletedCheckpoint} instances.
+	 *
+	 * <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest
+	 * available checkpoint.
+	 */
+	void recover() throws Exception;
+
+	/**
+	 * Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints.
+	 *
+	 * <p>Only a bounded number of checkpoints is kept. When exceeding the maximum number of
+	 * retained checkpoints, the oldest one will be discarded via {@link
+	 * CompletedCheckpoint#discard(ClassLoader)}.
+	 */
+	void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception;
+
+	/**
+	 * Returns the latest {@link CompletedCheckpoint} instance or <code>null</code> if none was
+	 * added.
+	 */
+	CompletedCheckpoint getLatestCheckpoint() throws Exception;
+
+	/**
+	 * Discards all added {@link CompletedCheckpoint} instances via {@link
+	 * CompletedCheckpoint#discard(ClassLoader)}.
+	 */
+	void discardAllCheckpoints() throws Exception;
+
+	/**
+	 * Returns all {@link CompletedCheckpoint} instances.
+	 *
+	 * <p>Returns an empty list if no checkpoint has been added yet.
+	 */
+	List<CompletedCheckpoint> getAllCheckpoints() throws Exception;
+
+	/**
+	 * Returns the current number of retained checkpoints.
+	 */
+	int getNumberOfRetainedCheckpoints();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 370ae50..81159f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -31,7 +31,7 @@ import org.apache.flink.util.SerializedValue;
 /**
  * A pending checkpoint is a checkpoint that has been started, but has not been
  * acknowledged by all tasks that need to acknowledge it. Once all tasks have
- * acknowledged it, it becomes a {@link SuccessfulCheckpoint}.
+ * acknowledged it, it becomes a {@link CompletedCheckpoint}.
  * 
  * <p>Note that the pending checkpoint, as well as the successful checkpoint keep the
  * state handles always as serialized values, never as actual values.</p>
@@ -109,13 +109,13 @@ public class PendingCheckpoint {
 		return collectedStates;
 	}
 	
-	public SuccessfulCheckpoint toCompletedCheckpoint() {
+	public CompletedCheckpoint toCompletedCheckpoint() {
 		synchronized (lock) {
 			if (discarded) {
 				throw new IllegalStateException("pending checkpoint is discarded");
 			}
 			if (notYetAcknowledgedTasks.isEmpty()) {
-				SuccessfulCheckpoint completed =  new SuccessfulCheckpoint(jobId, checkpointId,
+				CompletedCheckpoint completed =  new CompletedCheckpoint(jobId, checkpointId,
 						checkpointTimestamp, new ArrayList<StateForTask>(collectedStates));
 				discard(null, false);
 				

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
new file mode 100644
index 0000000..052d743
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#STANDALONE}.
+ *
+ * <p>Simple wrapper of an {@link AtomicLong}. This is sufficient, because job managers are not
+ * recoverable in this recovery mode.
+ */
+public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {
+
+	private final AtomicLong checkpointIdCounter = new AtomicLong(1);
+
+	@Override
+	public void start() throws Exception {
+	}
+
+	@Override
+	public void stop() throws Exception {
+	}
+
+	@Override
+	public long getAndIncrement() throws Exception {
+		return checkpointIdCounter.getAndIncrement();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
new file mode 100644
index 0000000..324a0be
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+
+/**
+ * {@link CheckpointCoordinator} components in {@link RecoveryMode#STANDALONE}.
+ */
+public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
+
+	@Override
+	public void start() {
+		// Nothing to do
+	}
+
+	@Override
+	public void stop() {
+		// Nothing to do
+	}
+
+	@Override
+	public CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)
+			throws Exception {
+
+		return new StandaloneCompletedCheckpointStore(CheckpointRecoveryFactory
+				.NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, userClassLoader);
+	}
+
+	@Override
+	public CheckpointIDCounter createCheckpointIDCounter(JobID ignored) {
+		return new StandaloneCheckpointIDCounter();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
new file mode 100644
index 0000000..c31606a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#STANDALONE}.
+ */
+class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore {
+
+	/** The maximum number of checkpoints to retain (at least 1). */
+	private final int maxNumberOfCheckpointsToRetain;
+
+	/** User class loader for discarding {@link CompletedCheckpoint} instances. */
+	private final ClassLoader userClassLoader;
+
+	/** The completed checkpoints. */
+	private final ArrayDeque<CompletedCheckpoint> checkpoints;
+
+	/**
+	 * Creates {@link StandaloneCompletedCheckpointStore}.
+	 *
+	 * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at
+	 *                                       least 1). Adding more checkpoints than this results
+	 *                                       in older checkpoints being discarded.
+	 * @param userClassLoader                The user class loader used to discard checkpoints
+	 */
+	public StandaloneCompletedCheckpointStore(
+			int maxNumberOfCheckpointsToRetain,
+			ClassLoader userClassLoader) {
+
+		checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
+
+		this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;
+		this.userClassLoader = checkNotNull(userClassLoader, "User class loader");
+
+		this.checkpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
+	}
+
+	@Override
+	public void recover() throws Exception {
+		// Nothing to do
+	}
+
+	@Override
+	public void addCheckpoint(CompletedCheckpoint checkpoint) {
+		checkpoints.addLast(checkpoint);
+		if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
+			checkpoints.removeFirst().discard(userClassLoader);
+		}
+	}
+
+	@Override
+	public CompletedCheckpoint getLatestCheckpoint() {
+		return checkpoints.isEmpty() ? null : checkpoints.getLast();
+	}
+
+	@Override
+	public List<CompletedCheckpoint> getAllCheckpoints() {
+		return new ArrayList<>(checkpoints);
+	}
+
+	@Override
+	public int getNumberOfRetainedCheckpoints() {
+		return checkpoints.size();
+	}
+
+	@Override
+	public void discardAllCheckpoints() {
+		for (CompletedCheckpoint checkpoint : checkpoints) {
+			checkpoint.discard(userClassLoader);
+		}
+
+		checkpoints.clear();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
index 120c503..d1428f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
@@ -24,6 +24,11 @@ import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * Simple bean to describe the state belonging to a parallel operator.
  * Since we hold the state across execution attempts, we identify a task by its
@@ -34,8 +39,10 @@ import org.slf4j.LoggerFactory;
  * Furthermore, the state may involve user-defined classes that are not accessible without
  * the respective classloader.
  */
-public class StateForTask {
-	
+public class StateForTask implements Serializable {
+
+	private static final long serialVersionUID = -2394696997971923995L;
+
 	private static final Logger LOG = LoggerFactory.getLogger(StateForTask.class);
 
 	/** The state of the parallel operator */
@@ -48,12 +55,10 @@ public class StateForTask {
 	private final int subtask;
 	
 	public StateForTask(SerializedValue<StateHandle<?>> state, JobVertexID operatorId, int subtask) {
-	if (state == null || operatorId == null || subtask < 0) {
-			throw new IllegalArgumentException();
-		}
-		
-		this.state = state;
-		this.operatorId = operatorId;
+		this.state = checkNotNull(state, "State");
+		this.operatorId = checkNotNull(operatorId, "Operator ID");
+
+		checkArgument(subtask >= 0, "Negative subtask index");
 		this.subtask = subtask;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
deleted file mode 100644
index be0b301..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.api.common.JobID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state)
- * and that is considered completed.
- */
-public class SuccessfulCheckpoint {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(SuccessfulCheckpoint.class);
-	
-	private final JobID job;
-	
-	private final long checkpointID;
-	
-	private final long timestamp;
-	
-	private final List<StateForTask> states;
-
-
-	public SuccessfulCheckpoint(JobID job, long checkpointID, long timestamp, List<StateForTask> states) {
-		this.job = job;
-		this.checkpointID = checkpointID;
-		this.timestamp = timestamp;
-		this.states = states;
-	}
-
-	public JobID getJobId() {
-		return job;
-	}
-
-	public long getCheckpointID() {
-		return checkpointID;
-	}
-
-	public long getTimestamp() {
-		return timestamp;
-	}
-
-	public List<StateForTask> getStates() {
-		return states;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	public void discard(ClassLoader userClassLoader) {
-		for(StateForTask state: states){
-			state.discard(userClassLoader);
-		}
-		states.clear();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public String toString() {
-		return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
new file mode 100644
index 0000000..6673050
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.VersionedValue;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}.
+ *
+ * <p>Each counter creates a ZNode:
+ * <pre>
+ * +----O /flink/checkpoint-counter/&lt;job-id&gt; 1 [persistent]
+ * .
+ * .
+ * .
+ * +----O /flink/checkpoint-counter/&lt;job-id&gt; N [persistent]
+ * </pre>
+ *
+ * <p>The checkpoints IDs are required to be ascending (per job). In order to guarantee this in case
+ * of job manager failures we use ZooKeeper to have a shared counter across job manager instances.
+ */
+public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCheckpointIDCounter.class);
+
+	/** Curator ZooKeeper client */
+	private final CuratorFramework client;
+
+	/** Path of the shared count */
+	private final String counterPath;
+
+	/** Curator recipe for shared counts */
+	private final SharedCount sharedCount;
+
+	/** Connection state listener to monitor the client connection */
+	private final SharedCountConnectionStateListener connStateListener =
+			new SharedCountConnectionStateListener();
+
+	/**
+	 * Creates a {@link ZooKeeperCheckpointIDCounter} instance.
+	 *
+	 * @param client      Curator ZooKeeper client
+	 * @param counterPath ZooKeeper path for the counter. It's sufficient to have a path per-job.
+	 * @throws Exception
+	 */
+	public ZooKeeperCheckpointIDCounter(CuratorFramework client, String counterPath) throws Exception {
+		this.client = checkNotNull(client, "Curator client");
+		this.counterPath = checkNotNull(counterPath, "Counter path");
+		this.sharedCount = new SharedCount(client, counterPath, 1);
+	}
+
+	@Override
+	public void start() throws Exception {
+		sharedCount.start();
+		client.getConnectionStateListenable().addListener(connStateListener);
+	}
+
+	@Override
+	public void stop() throws Exception {
+		sharedCount.close();
+		client.getConnectionStateListenable().removeListener(connStateListener);
+
+		LOG.info("Removing {} from ZooKeeper", counterPath);
+		client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath);
+	}
+
+	@Override
+	public long getAndIncrement() throws Exception {
+		while (true) {
+			ConnectionState connState = connStateListener.getLastState();
+
+			if (connState != null) {
+				throw new IllegalStateException("Connection state: " + connState);
+			}
+
+			VersionedValue<Integer> current = sharedCount.getVersionedValue();
+
+			Integer newCount = current.getValue() + 1;
+
+			if (sharedCount.trySetCount(current, newCount)) {
+				return current.getValue();
+			}
+		}
+	}
+
+	/**
+	 * Connection state listener. In case of {@link ConnectionState#SUSPENDED} or {@link
+	 * ConnectionState#LOST} we are not guaranteed to read a current count from ZooKeeper.
+	 */
+	private class SharedCountConnectionStateListener implements ConnectionStateListener {
+
+		private volatile ConnectionState lastState;
+
+		@Override
+		public void stateChanged(CuratorFramework client, ConnectionState newState) {
+			if (newState == ConnectionState.SUSPENDED || newState == ConnectionState.LOST) {
+				lastState = newState;
+			}
+		}
+
+		private ConnectionState getLastState() {
+			return lastState;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
new file mode 100644
index 0000000..2659e7e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link CheckpointCoordinator} components in {@link RecoveryMode#ZOOKEEPER}.
+ */
+public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
+
+	private final CuratorFramework client;
+
+	private final Configuration config;
+
+	public ZooKeeperCheckpointRecoveryFactory(CuratorFramework client, Configuration config) {
+		this.client = checkNotNull(client, "Curator client");
+		this.config = checkNotNull(config, "Configuration");
+	}
+
+	@Override
+	public void start() {
+		// Nothing to do
+	}
+
+	@Override
+	public void stop() {
+		client.close();
+	}
+
+	@Override
+	public CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)
+			throws Exception {
+
+		return ZooKeeperUtils.createCompletedCheckpoints(client, config, jobId,
+				NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, userClassLoader);
+	}
+
+	@Override
+	public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) throws Exception {
+		return ZooKeeperUtils.createCheckpointIDCounter(client, config, jobID);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
new file mode 100644
index 0000000..62ab440
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}.
+ *
+ * <p>Checkpoints are added under a ZNode per job:
+ * <pre>
+ * +----O /flink/checkpoints/&lt;job-id&gt;  [persistent]
+ * .    |
+ * .    +----O /flink/checkpoints/&lt;job-id&gt;/1 [persistent]
+ * .    .                                  .
+ * .    .                                  .
+ * .    .                                  .
+ * .    +----O /flink/checkpoints/&lt;job-id&gt;/N [persistent]
+ * </pre>
+ *
+ * <p>During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one,
+ * only the latest one is used and older ones are discarded (even if the maximum number
+ * of retained checkpoints is greater than one).
+ *
+ * <p>If there is a network partition and multiple JobManagers run concurrent checkpoints for the
+ * same program, it is OK to take any valid successful checkpoint as long as the "history" of
+ * checkpoints is consistent. Currently, after recovery we start out with only a single
+ * checkpoint to circumvent those situations.
+ */
+public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class);
+
+	/** Curator ZooKeeper client */
+	private final CuratorFramework client;
+
+	/** Completed checkpoints in ZooKeeper */
+	private final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper;
+
+	/** The maximum number of checkpoints to retain (at least 1). */
+	private final int maxNumberOfCheckpointsToRetain;
+
+	/** User class loader for discarding {@link CompletedCheckpoint} instances. */
+	private final ClassLoader userClassLoader;
+
+	/** Local completed checkpoints. */
+	private final ArrayDeque<Tuple2<StateHandle<CompletedCheckpoint>, String>> checkpointStateHandles;
+
+	/**
+	 * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
+	 *
+	 * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at
+	 *                                       least 1). Adding more checkpoints than this results
+	 *                                       in older checkpoints being discarded. On recovery,
+	 *                                       we will only start with a single checkpoint.
+	 * @param userClassLoader                The user class loader used to discard checkpoints
+	 * @param client                         The Curator ZooKeeper client
+	 * @param checkpointsPath                The ZooKeeper path for the checkpoints (needs to
+	 *                                       start with a '/')
+	 * @param stateHandleProvider            The state handle provider for checkpoints
+	 * @throws Exception
+	 */
+	public ZooKeeperCompletedCheckpointStore(
+			int maxNumberOfCheckpointsToRetain,
+			ClassLoader userClassLoader,
+			CuratorFramework client,
+			String checkpointsPath,
+			StateHandleProvider<CompletedCheckpoint> stateHandleProvider) throws Exception {
+
+		checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
+
+		this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;
+		this.userClassLoader = checkNotNull(userClassLoader, "User class loader");
+
+		checkNotNull(client, "Curator client");
+		checkNotNull(checkpointsPath, "Checkpoints path");
+		checkNotNull(stateHandleProvider, "State handle provider");
+
+		// Ensure that the checkpoints path exists
+		client.newNamespaceAwareEnsurePath(checkpointsPath)
+				.ensure(client.getZookeeperClient());
+
+		// All operations will have the path as root
+		this.client = client.usingNamespace(client.getNamespace() + checkpointsPath);
+
+		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(
+				this.client, stateHandleProvider);
+
+		this.checkpointStateHandles = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
+
+		LOG.info("Initialized in '{}'.", checkpointsPath);
+	}
+
+	/**
+	 * Gets the latest checkpoint from ZooKeeper and removes all others.
+	 *
+	 * <p><strong>Important</strong>: Even if there are more than one checkpoint in ZooKeeper,
+	 * this will only recover the latest and discard the others. Otherwise, there is no guarantee
+	 * that the history of checkpoints is consistent.
+	 */
+	@Override
+	public void recover() throws Exception {
+		LOG.info("Recovering checkpoints from ZooKeeper.");
+
+		// Get all there is first
+		List<Tuple2<StateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
+		while (true) {
+			try {
+				initialCheckpoints = checkpointsInZooKeeper.getAllSortedByName();
+				break;
+			}
+			catch (ConcurrentModificationException e) {
+				LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying.");
+			}
+		}
+
+		int numberOfInitialCheckpoints = initialCheckpoints.size();
+
+		LOG.info("Found {} checkpoints in ZooKeeper.", numberOfInitialCheckpoints);
+
+		if (numberOfInitialCheckpoints > 0) {
+			// Take the last one. This is the latest checkpoints, because path names are strictly
+			// increasing (checkpoint ID).
+			Tuple2<StateHandle<CompletedCheckpoint>, String> latest = initialCheckpoints
+					.get(numberOfInitialCheckpoints - 1);
+
+			CompletedCheckpoint latestCheckpoint = latest.f0.getState(userClassLoader);
+
+			checkpointStateHandles.add(latest);
+
+			LOG.info("Initialized with {}. Removing all older checkpoints.", latestCheckpoint);
+
+			for (int i = 0; i < numberOfInitialCheckpoints - 1; i++) {
+				try {
+					removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i));
+				}
+				catch (Exception e) {
+					LOG.error("Failed to discard checkpoint", e);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Synchronously writes the new checkpoints to ZooKeeper and asynchronously removes older ones.
+	 *
+	 * @param checkpoint Completed checkpoint to add.
+	 */
+	@Override
+	public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
+		checkNotNull(checkpoint, "Checkpoint");
+
+		// First add the new one. If it fails, we don't want to loose existing data.
+		String path = String.format("/%s", checkpoint.getCheckpointID());
+
+		final StateHandle<CompletedCheckpoint> stateHandle = checkpointsInZooKeeper.add(path, checkpoint);
+
+		checkpointStateHandles.addLast(new Tuple2<>(stateHandle, path));
+
+		// Everything worked, let's remove a previous checkpoint if necessary.
+		if (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) {
+			removeFromZooKeeperAndDiscardCheckpoint(checkpointStateHandles.removeFirst());
+		}
+
+		LOG.debug("Added {} to {}.", checkpoint, path);
+	}
+
+	@Override
+	public CompletedCheckpoint getLatestCheckpoint() throws Exception {
+		if (checkpointStateHandles.isEmpty()) {
+			return null;
+		}
+		else {
+			return checkpointStateHandles.getLast().f0.getState(userClassLoader);
+		}
+	}
+
+	@Override
+	public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
+		List<CompletedCheckpoint> checkpoints = new ArrayList<>(checkpointStateHandles.size());
+
+		for (Tuple2<StateHandle<CompletedCheckpoint>, String> stateHandle : checkpointStateHandles) {
+			checkpoints.add(stateHandle.f0.getState(userClassLoader));
+		}
+
+		return checkpoints;
+	}
+
+	@Override
+	public int getNumberOfRetainedCheckpoints() {
+		return checkpointStateHandles.size();
+	}
+
+	@Override
+	public void discardAllCheckpoints() throws Exception {
+		for (Tuple2<StateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) {
+			try {
+				removeFromZooKeeperAndDiscardCheckpoint(checkpoint);
+			}
+			catch (Exception e) {
+				LOG.error("Failed to discard checkpoint.", e);
+			}
+		}
+
+		checkpointStateHandles.clear();
+
+		String path = "/" + client.getNamespace();
+
+		LOG.info("Removing {} from ZooKeeper", path);
+		ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
+	}
+
+	/**
+	 * Removes the state handle from ZooKeeper, discards the checkpoints, and the state handle.
+	 */
+	private void removeFromZooKeeperAndDiscardCheckpoint(
+			final Tuple2<StateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception {
+
+		final BackgroundCallback callback = new BackgroundCallback() {
+			@Override
+			public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+				try {
+					if (event.getType() == CuratorEventType.DELETE) {
+						if (event.getResultCode() == 0) {
+							// The checkpoint
+							CompletedCheckpoint checkpoint = stateHandleAndPath
+									.f0.getState(userClassLoader);
+
+							checkpoint.discard(userClassLoader);
+
+							// Discard the state handle
+							stateHandleAndPath.f0.discardState();
+
+							// Discard the checkpoint
+							LOG.debug("Discarded " + checkpoint);
+						}
+						else {
+							throw new IllegalStateException("Unexpected result code " +
+									event.getResultCode() + " in '" + event + "' callback.");
+						}
+					}
+					else {
+						throw new IllegalStateException("Unexpected event type " +
+								event.getType() + " in '" + event + "' callback.");
+					}
+				}
+				catch (Exception e) {
+					LOG.error("Failed to discard checkpoint.", e);
+				}
+			}
+		};
+
+		// Remove state handle from ZooKeeper first. If this fails, we can still recover, but if
+		// we remove a state handle and fail to remove it from ZooKeeper, we end up in an
+		// inconsistent state.
+		checkpointsInZooKeeper.remove(stateHandleAndPath.f1, callback);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index ef00484..9430d80 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -30,6 +30,8 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -39,6 +41,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -110,8 +113,6 @@ public class ExecutionGraph implements Serializable {
 	/** The log object used for debugging. */
 	static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
 	
-	private static final int NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN = 1;
-
 	// --------------------------------------------------------------------------------------------
 
 	/** The lock used to secure all access to mutable fields, especially the tracking of progress
@@ -347,7 +348,11 @@ public class ExecutionGraph implements Serializable {
 			List<ExecutionJobVertex> verticesToWaitFor,
 			List<ExecutionJobVertex> verticesToCommitTo,
 			ActorSystem actorSystem,
-			UUID leaderSessionID) {
+			UUID leaderSessionID,
+			CheckpointIDCounter checkpointIDCounter,
+			CompletedCheckpointStore completedCheckpointStore,
+			RecoveryMode recoveryMode) throws Exception {
+
 		// simple sanity checks
 		if (interval < 10 || checkpointTimeout < 10) {
 			throw new IllegalArgumentException();
@@ -367,12 +372,14 @@ public class ExecutionGraph implements Serializable {
 		snapshotCheckpointsEnabled = true;
 		checkpointCoordinator = new CheckpointCoordinator(
 				jobID,
-				NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN,
 				checkpointTimeout,
 				tasksToTrigger,
 				tasksToWaitFor,
 				tasksToCommitTo,
-				userClassLoader);
+				userClassLoader,
+				checkpointIDCounter,
+				completedCheckpointStore,
+				recoveryMode);
 		
 		// the periodic checkpoint scheduler is activated and deactivated as a result of
 		// job status changes (running -> on, all other states -> off)
@@ -382,8 +389,14 @@ public class ExecutionGraph implements Serializable {
 						interval,
 						leaderSessionID));
 	}
-	
-	public void disableSnaphotCheckpointing() {
+
+	/**
+	 * Disables checkpointing.
+	 *
+	 * <p>The shutdown of the checkpoint coordinator might block. Make sure that calls to this
+	 * method don't block the job manager actor and run asynchronously.
+	 */
+	public void disableSnaphotCheckpointing() throws Exception {
 		if (state != JobStatus.CREATED) {
 			throw new IllegalStateException("Job must be in CREATED state");
 		}
@@ -773,6 +786,20 @@ public class ExecutionGraph implements Serializable {
 	}
 
 	/**
+	 * Restores the latest checkpointed state.
+	 *
+	 * <p>The recovery of checkpoints might block. Make sure that calls to this method don't
+	 * block the job manager actor and run asynchronously.
+	 */
+	public void restoreLatestCheckpointedState() throws Exception {
+		synchronized (progressLock) {
+			if (checkpointCoordinator != null) {
+				checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false);
+			}
+		}
+	}
+
+	/**
 	 * This method cleans fields that are irrelevant for the archived execution attempt.
 	 */
 	public void prepareForArchiving() {
@@ -886,7 +913,13 @@ public class ExecutionGraph implements Serializable {
 									}
 								}, executionContext);
 							} else {
-								restart();
+								future(new Callable<Object>() {
+									@Override
+									public Object call() throws Exception {
+										restart();
+										return null;
+									}
+								}, executionContext);
 							}
 							break;
 						}
@@ -906,7 +939,7 @@ public class ExecutionGraph implements Serializable {
 			}
 		}
 	}
-	
+
 	private void postRunCleanup() {
 		try {
 			CheckpointCoordinator coord = this.checkpointCoordinator;

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 6b36e2d..a64d63c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -538,4 +538,9 @@ public class JobGraph implements Serializable {
 			}
 		}
 	}
+
+	@Override
+	public String toString() {
+		return "JobGraph(jobId: " + jobID + ")";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
index 2e75b19..17322d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
@@ -35,6 +35,19 @@ public enum RecoveryMode {
 	ZOOKEEPER;
 
 	/**
+	 * Return the configured {@link RecoveryMode}.
+	 *
+	 * @param config The config to parse
+	 * @return Configured recovery mode or {@link ConfigConstants#DEFAULT_RECOVERY_MODE} if not
+	 * configured.
+	 */
+	public static RecoveryMode fromConfig(Configuration config) {
+		return RecoveryMode.valueOf(config.getString(
+				ConfigConstants.RECOVERY_MODE,
+				ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase());
+	}
+
+	/**
 	 * Returns true if the defined recovery mode supports high availability.
 	 *
 	 * @param configuration Configuration which contains the recovery mode

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
new file mode 100644
index 0000000..db36f92
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import scala.Option;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#STANDALONE}.
+ *
+ * <p>All operations are NoOps, because {@link JobGraph} instances cannot be recovered in this
+ * recovery mode.
+ */
+public class StandaloneSubmittedJobGraphStore implements SubmittedJobGraphStore {
+
+	@Override
+	public void start(SubmittedJobGraphListener jobGraphListener) throws Exception {
+		// Nothing to do
+	}
+
+	@Override
+	public void stop() {
+		// Nothing to do
+	}
+
+	@Override
+	public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+		// Nothing to do
+	}
+
+	@Override
+	public void removeJobGraph(JobID jobId) throws Exception {
+		// Nothing to do
+	}
+
+	@Override
+	public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
+		return Option.empty();
+	}
+
+	@Override
+	public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
+		return Collections.emptyList();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
new file mode 100644
index 0000000..48da3b8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import java.io.Serializable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A recoverable {@link JobGraph} and {@link JobInfo}.
+ */
+public class SubmittedJobGraph implements Serializable {
+
+	private static final long serialVersionUID = 2836099271734771825L;
+
+	/** The submitted {@link JobGraph} */
+	private final JobGraph jobGraph;
+
+	/** The {@link JobInfo}. */
+	private final JobInfo jobInfo;
+
+	/**
+	 * Creates a {@link SubmittedJobGraph}.
+	 *
+	 * @param jobGraph The submitted {@link JobGraph}
+	 * @param jobInfo  The {@link JobInfo}
+	 */
+	public SubmittedJobGraph(JobGraph jobGraph, JobInfo jobInfo) {
+		this.jobGraph = checkNotNull(jobGraph, "Job graph");
+		this.jobInfo = checkNotNull(jobInfo, "Job info");
+	}
+
+	/**
+	 * Returns the submitted {@link JobGraph}.
+	 */
+	public JobGraph getJobGraph() {
+		return jobGraph;
+	}
+
+	/**
+	 * Returns the {@link JobID} of the submitted {@link JobGraph}.
+	 */
+	public JobID getJobId() {
+		return jobGraph.getJobID();
+	}
+
+	/**
+	 * Returns the {@link JobInfo} of the client who submitted the {@link JobGraph}.
+	 */
+	public JobInfo getJobInfo() throws Exception {
+		return jobInfo;
+	}
+
+	@Override
+	public String toString() {
+		return String.format("SubmittedJobGraph(%s, %s)", jobGraph, jobInfo);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
new file mode 100644
index 0000000..bd628cd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import scala.Option;
+
+import java.util.List;
+
+/**
+ * {@link SubmittedJobGraph} instances for recovery.
+ */
+public interface SubmittedJobGraphStore {
+
+	/**
+	 * Starts the {@link SubmittedJobGraphStore} service.
+	 */
+	void start(SubmittedJobGraphListener jobGraphListener) throws Exception;
+
+	/**
+	 * Stops the {@link SubmittedJobGraphStore} service.
+	 */
+	void stop() throws Exception;
+
+	/**
+	 * Returns a list of all submitted {@link JobGraph} instances.
+	 */
+	List<SubmittedJobGraph> recoverJobGraphs() throws Exception;
+
+	/**
+	 * Returns the {@link SubmittedJobGraph} with the given {@link JobID}.
+	 *
+	 * <p>An Exception is thrown, if no job graph with the given ID exists.
+	 */
+	Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception;
+
+	/**
+	 * Adds the {@link SubmittedJobGraph} instance.
+	 *
+	 * <p>If a job graph with the same {@link JobID} exists, it is replaced.
+	 */
+	void putJobGraph(SubmittedJobGraph jobGraph) throws Exception;
+
+	/**
+	 * Removes the {@link SubmittedJobGraph} with the given {@link JobID} if it exists.
+	 */
+	void removeJobGraph(JobID jobId) throws Exception;
+
+	/**
+	 * A listener for {@link SubmittedJobGraph} instances. This is used to react to races between
+	 * multiple running {@link SubmittedJobGraphStore} instances (on multiple job managers).
+	 */
+	interface SubmittedJobGraphListener {
+
+		/**
+		 * Callback for {@link SubmittedJobGraph} instances added by a different {@link
+		 * SubmittedJobGraphStore} instance.
+		 *
+		 * <p><strong>Important:</strong> It is possible to get false positives and be notified
+		 * about a job graph, which was added by this instance.
+		 *
+		 * @param jobId The {@link JobID} of the added job graph
+		 */
+		void onAddedJobGraph(JobID jobId);
+
+		/**
+		 * Callback for {@link SubmittedJobGraph} instances removed by a different {@link
+		 * SubmittedJobGraphStore} instance.
+		 *
+		 * @param jobId The {@link JobID} of the removed job graph
+		 */
+		void onRemovedJobGraph(JobID jobId);
+
+	}
+
+}