You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/10/20 09:58:56 UTC

[05/47] flink git commit: [FLINK-2354] [runtime] Add job graph and checkpoint recovery

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
new file mode 100644
index 0000000..660f8bc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}.
+ *
+ * <p>Each job graph creates ZNode:
+ * <pre>
+ * +----O /flink/jobgraphs/&lt;job-id&gt; 1 [persistent]
+ * .
+ * .
+ * .
+ * +----O /flink/jobgraphs/&lt;job-id&gt; N [persistent]
+ * </pre>
+ *
+ * <p>The root path is watched to detect concurrent modifications in corner situations where
+ * multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener}
+ * to react to such situations.
+ */
+public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphStore.class);
+
+	/** Lock to synchronize with the {@link SubmittedJobGraphListener}. */
+	private final Object cacheLock = new Object();
+
+	/** Client (not a namespace facade) */
+	private final CuratorFramework client;
+
+	/** The set of IDs of all added job graphs. */
+	private final Set<JobID> addedJobGraphs = new HashSet<>();
+
+	/** Completed checkpoints in ZooKeeper */
+	private final ZooKeeperStateHandleStore<SubmittedJobGraph> jobGraphsInZooKeeper;
+
+	/**
+	 * Cache to monitor all children. This is used to detect races with other instances working
+	 * on the same state.
+	 */
+	private final PathChildrenCache pathCache;
+
+	/** The external listener to be notified on races. */
+	private SubmittedJobGraphListener jobGraphListener;
+
+	/** Flag indicating whether this instance is running. */
+	private boolean isRunning;
+
+	public ZooKeeperSubmittedJobGraphStore(
+			CuratorFramework client,
+			String currentJobsPath,
+			StateHandleProvider<SubmittedJobGraph> stateHandleProvider) throws Exception {
+
+		checkNotNull(currentJobsPath, "Current jobs path");
+		checkNotNull(stateHandleProvider, "State handle provider");
+
+		// Keep a reference to the original client and not the namespace facade. The namespace
+		// facade cannot be closed.
+		this.client = checkNotNull(client, "Curator client");
+
+		// Ensure that the job graphs path exists
+		client.newNamespaceAwareEnsurePath(currentJobsPath)
+				.ensure(client.getZookeeperClient());
+
+		// All operations will have the path as root
+		client = client.usingNamespace(client.getNamespace() + currentJobsPath);
+
+		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(client, stateHandleProvider);
+
+		this.pathCache = new PathChildrenCache(client, "/", false);
+		pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener());
+	}
+
+	@Override
+	public void start(SubmittedJobGraphListener jobGraphListener) throws Exception {
+		synchronized (cacheLock) {
+			if (!isRunning) {
+				this.jobGraphListener = jobGraphListener;
+
+				pathCache.start();
+
+				isRunning = true;
+			}
+		}
+	}
+
+	@Override
+	public void stop() throws Exception {
+		synchronized (cacheLock) {
+			if (isRunning) {
+				jobGraphListener = null;
+
+				pathCache.close();
+
+				client.close();
+
+				isRunning = false;
+			}
+		}
+	}
+
+	@Override
+	public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
+		synchronized (cacheLock) {
+			verifyIsRunning();
+
+			List<Tuple2<StateHandle<SubmittedJobGraph>, String>> submitted;
+
+			while (true) {
+				try {
+					submitted = jobGraphsInZooKeeper.getAll();
+					break;
+				}
+				catch (ConcurrentModificationException e) {
+					LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying.");
+				}
+			}
+
+			if (submitted.size() != 0) {
+				List<SubmittedJobGraph> jobGraphs = new ArrayList<>(submitted.size());
+
+				for (Tuple2<StateHandle<SubmittedJobGraph>, String> jobStateHandle : submitted) {
+					SubmittedJobGraph jobGraph = jobStateHandle
+							.f0.getState(ClassLoader.getSystemClassLoader());
+
+					addedJobGraphs.add(jobGraph.getJobId());
+
+					jobGraphs.add(jobGraph);
+				}
+
+				LOG.info("Recovered {} job graphs: {}.", jobGraphs.size(), jobGraphs);
+				return jobGraphs;
+			}
+			else {
+				LOG.info("No job graph to recover.");
+				return Collections.emptyList();
+			}
+		}
+	}
+
+	@Override
+	public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
+		checkNotNull(jobId, "Job ID");
+		String path = getPathForJob(jobId);
+
+		synchronized (cacheLock) {
+			verifyIsRunning();
+
+			try {
+				StateHandle<SubmittedJobGraph> jobStateHandle = jobGraphsInZooKeeper.get(path);
+
+				SubmittedJobGraph jobGraph = jobStateHandle
+						.getState(ClassLoader.getSystemClassLoader());
+
+				addedJobGraphs.add(jobGraph.getJobId());
+
+				LOG.info("Recovered {}.", jobGraph);
+
+				return Option.apply(jobGraph);
+			}
+			catch (KeeperException.NoNodeException ignored) {
+				return Option.empty();
+			}
+		}
+	}
+
+	@Override
+	public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+		checkNotNull(jobGraph, "Job graph");
+		String path = getPathForJob(jobGraph.getJobId());
+
+		boolean success = false;
+
+		while (!success) {
+			synchronized (cacheLock) {
+				verifyIsRunning();
+
+				int currentVersion = jobGraphsInZooKeeper.exists(path);
+
+				if (currentVersion == -1) {
+					try {
+						jobGraphsInZooKeeper.add(path, jobGraph);
+
+						addedJobGraphs.add(jobGraph.getJobId());
+
+						LOG.info("Added {} to ZooKeeper.", jobGraph);
+
+						success = true;
+					}
+					catch (KeeperException.NodeExistsException ignored) {
+					}
+				}
+				else if (addedJobGraphs.contains(jobGraph.getJobId())) {
+					try {
+						jobGraphsInZooKeeper.replace(path, currentVersion, jobGraph);
+						LOG.info("Updated {} in ZooKeeper.", jobGraph);
+
+						success = true;
+					}
+					catch (KeeperException.NoNodeException ignored) {
+					}
+				}
+				else {
+					throw new IllegalStateException("Oh, no. Trying to update a graph you didn't " +
+							"#getAllSubmittedJobGraphs() or #putJobGraph() yourself before.");
+				}
+			}
+		}
+	}
+
+	@Override
+	public void removeJobGraph(JobID jobId) throws Exception {
+		checkNotNull(jobId, "Job ID");
+		String path = getPathForJob(jobId);
+
+		synchronized (cacheLock) {
+			if (addedJobGraphs.contains(jobId)) {
+				jobGraphsInZooKeeper.removeAndDiscardState(path);
+
+				addedJobGraphs.remove(jobId);
+				LOG.info("Removed job graph {} from ZooKeeper.", jobId);
+			}
+		}
+	}
+
+	/**
+	 * Monitors ZooKeeper for changes.
+	 *
+	 * <p>Detects modifications from other job managers in corner situations. The event
+	 * notifications fire for changes from this job manager as well.
+	 */
+	private final class SubmittedJobGraphsPathCacheListener implements PathChildrenCacheListener {
+
+		@Override
+		public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+				throws Exception {
+
+			if (LOG.isDebugEnabled()) {
+				if (event.getData() != null) {
+					LOG.debug("Received {} event (path: {})", event.getType(), event.getData().getPath());
+				}
+				else {
+					LOG.debug("Received {} event", event.getType());
+				}
+			}
+
+			switch (event.getType()) {
+				case CHILD_ADDED:
+					synchronized (cacheLock) {
+						try {
+							JobID jobId = fromEvent(event);
+							if (jobGraphListener != null && !addedJobGraphs.contains(jobId)) {
+								try {
+									// Whoa! This has been added by someone else. Or we were fast
+									// to remove it (false positive).
+									jobGraphListener.onAddedJobGraph(jobId);
+								}
+								catch (Throwable t) {
+									LOG.error("Error in callback", t);
+								}
+							}
+						}
+						catch (Exception e) {
+							LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
+						}
+					}
+
+					break;
+
+				case CHILD_UPDATED:
+					// Nothing to do
+					break;
+
+				case CHILD_REMOVED:
+					synchronized (cacheLock) {
+						try {
+							JobID jobId = fromEvent(event);
+							if (jobGraphListener != null && addedJobGraphs.contains(jobId)) {
+								try {
+									// Oh oh. Someone else removed one of our job graphs. Mean!
+									jobGraphListener.onRemovedJobGraph(jobId);
+								}
+								catch (Throwable t) {
+									LOG.error("Error in callback", t);
+								}
+							}
+
+							break;
+						}
+						catch (Exception e) {
+							LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
+						}
+					}
+					break;
+
+				case CONNECTION_SUSPENDED:
+					LOG.warn("ZooKeeper connection SUSPENDED. Changes to the submitted job " +
+							"graphs are not monitored (temporarily).");
+
+				case CONNECTION_LOST:
+					LOG.warn("ZooKeeper connection LOST. Changes to the submitted job " +
+							"graphs are not monitored (permanently).");
+					break;
+
+				case CONNECTION_RECONNECTED:
+					LOG.info("ZooKeeper connection RECONNECTED. Changes to the submitted job " +
+							"graphs are monitored again.");
+
+				case INITIALIZED:
+					LOG.info("SubmittedJobGraphsPathCacheListener initialized");
+					break;
+			}
+		}
+
+		/**
+		 * Returns a JobID for the event's path.
+		 */
+		private JobID fromEvent(PathChildrenCacheEvent event) {
+			return JobID.fromHexString(ZKPaths.getNodeFromPath(event.getData().getPath()));
+		}
+	}
+
+	/**
+	 * Verifies that the state is running.
+	 */
+	private void verifyIsRunning() {
+		checkState(isRunning, "Not running. Forgot to call start()?");
+	}
+
+	/**
+	 * Returns the JobID as a String (with leading slash).
+	 */
+	public static String getPathForJob(JobID jobId) {
+		checkNotNull(jobId, "Job ID");
+		return String.format("/%s", jobId);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
index b6223ee..6cba141 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
@@ -67,4 +67,5 @@ public interface LeaderElectionService {
 	 * @return true if the associated {@link LeaderContender} is the leader, otherwise false
 	 */
 	boolean hasLeadership();
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
index ae3f0e6..811037c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -43,6 +43,7 @@ import java.util.UUID;
  * ZooKeeper as well.
  */
 public class ZooKeeperLeaderElectionService implements LeaderElectionService, LeaderLatchListener, NodeCacheListener {
+
 	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionService.class);
 
 	/** Client to the ZooKeeper quorum */

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
new file mode 100644
index 0000000..7aa1ccf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+
+public enum StateBackend {
+	JOBMANAGER, FILESYSTEM;
+
+	/**
+	 * Returns the configured {@link StateBackend}.
+	 *
+	 * @param config The config to parse
+	 * @return Configured state backend or {@link ConfigConstants#DEFAULT_RECOVERY_MODE} if not
+	 * configured.
+	 */
+	public static StateBackend fromConfig(Configuration config) {
+		return StateBackend.valueOf(config.getString(
+				ConfigConstants.STATE_BACKEND,
+				ConfigConstants.DEFAULT_STATE_BACKEND).toUpperCase());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java
new file mode 100644
index 0000000..0086ac6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+import java.io.Serializable;
+
+/**
+ * State handler provider factory.
+ *
+ * <p>This is going to be superseded soon.
+ */
+public class StateHandleProviderFactory {
+
+	/**
+	 * Creates a {@link org.apache.flink.runtime.state.FileStateHandle.FileStateHandleProvider} at
+	 * the configured recovery path.
+	 */
+	public static <T extends Serializable> StateHandleProvider<T> createRecoveryFileStateHandleProvider(
+			Configuration config) {
+
+		StateBackend stateBackend = StateBackend.fromConfig(config);
+
+		if (stateBackend == StateBackend.FILESYSTEM) {
+			String recoveryPath = config.getString(
+					ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "");
+
+			if (recoveryPath.equals("")) {
+				throw new IllegalConfigurationException("Missing recovery path. Specify via " +
+						"configuration key '" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "'.");
+			}
+			else {
+				return FileStateHandle.createProvider(recoveryPath);
+			}
+		}
+		else {
+			throw new IllegalConfigurationException("Unexpected state backend configuration " +
+					stateBackend);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderElectionUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderElectionUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderElectionUtils.java
deleted file mode 100644
index 5f867a5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderElectionUtils.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
-
-/**
- * Utility class to help working with {@link LeaderElectionService} class.
- */
-public final class LeaderElectionUtils {
-
-	/**
-	 * Creates a {@link LeaderElectionService} based on the provided {@link Configuration} object.
-	 *
-	 * @param configuration Configuration object
-	 * @return {@link LeaderElectionService} which was created based on the provided Configuration
-	 * @throws Exception
-	 */
-	public static LeaderElectionService createLeaderElectionService(Configuration configuration) throws Exception {
-		RecoveryMode recoveryMode = RecoveryMode.valueOf(configuration.getString(
-				ConfigConstants.RECOVERY_MODE,
-				ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase()
-		);
-
-		LeaderElectionService leaderElectionService;
-
-		switch(recoveryMode) {
-			case STANDALONE:
-				leaderElectionService = new StandaloneLeaderElectionService();
-				break;
-			case ZOOKEEPER:
-				leaderElectionService = ZooKeeperUtils.createLeaderElectionService(configuration);
-				break;
-			default:
-				throw new Exception("Unknown RecoveryMode " + recoveryMode);
-		}
-
-		return leaderElectionService;
-	}
-
-	/**
-	 * Private constructor to prevent instantiation.
-	 */
-	private LeaderElectionUtils() {
-		throw new RuntimeException();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index d2d3945..79b9b7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -21,19 +21,27 @@ package org.apache.flink.runtime.util;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
+import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.state.StateHandleProviderFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * Utility class to work with Apache Zookeeper for Flink runtime.
- */
-public final class ZooKeeperUtils {
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class ZooKeeperUtils {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtils.class);
 
@@ -47,8 +55,10 @@ public final class ZooKeeperUtils {
 	public static CuratorFramework startCuratorFramework(Configuration configuration) {
 		String zkQuorum = configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
 
-		if(zkQuorum == null || zkQuorum.equals("")) {
-			throw new RuntimeException("No valid ZooKeeper quorum has been specified.");
+		if (zkQuorum == null || zkQuorum.equals("")) {
+			throw new RuntimeException("No valid ZooKeeper quorum has been specified. " +
+					"You can specify the quorum via the configuration key '" +
+					ConfigConstants.ZOOKEEPER_QUORUM_KEY + "'.");
 		}
 
 		int sessionTimeout = configuration.getInteger(
@@ -59,7 +69,7 @@ public final class ZooKeeperUtils {
 				ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT,
 				ConfigConstants.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT);
 
-		int retryWait = configuration.getInteger (
+		int retryWait = configuration.getInteger(
 				ConfigConstants.ZOOKEEPER_RETRY_WAIT,
 				ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT);
 
@@ -88,14 +98,10 @@ public final class ZooKeeperUtils {
 	}
 
 	/**
-	 * Returns whether high availability is enabled (<=> ZooKeeper quorum configured).
+	 * Returns whether {@link RecoveryMode#ZOOKEEPER} is configured.
 	 */
-	public static boolean isZooKeeperHighAvailabilityEnabled(Configuration flinkConf) {
-		String recoveryMode = flinkConf.getString(
-				ConfigConstants.RECOVERY_MODE,
-				ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase();
-
-		return recoveryMode.equals(RecoveryMode.ZOOKEEPER.name());
+	public static boolean isZooKeeperRecoveryMode(Configuration flinkConf) {
+		return RecoveryMode.fromConfig(flinkConf).equals(RecoveryMode.ZOOKEEPER);
 	}
 
 	/**
@@ -125,7 +131,7 @@ public final class ZooKeeperUtils {
 	 * @throws Exception
 	 */
 	public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
-			Configuration configuration) throws Exception{
+			Configuration configuration) throws Exception {
 		CuratorFramework client = startCuratorFramework(configuration);
 		String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
 				ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
@@ -134,7 +140,8 @@ public final class ZooKeeperUtils {
 	}
 
 	/**
-	 * Creates a {@link ZooKeeperLeaderElectionService} instance.
+	 * Creates a {@link ZooKeeperLeaderElectionService} instance and a new {@link
+	 * CuratorFramework} client.
 	 *
 	 * @param configuration {@link Configuration} object containing the configuration values
 	 * @return {@link ZooKeeperLeaderElectionService} instance.
@@ -142,8 +149,24 @@ public final class ZooKeeperUtils {
 	 */
 	public static ZooKeeperLeaderElectionService createLeaderElectionService(
 			Configuration configuration) throws Exception {
+
 		CuratorFramework client = startCuratorFramework(configuration);
 
+		return createLeaderElectionService(client, configuration);
+	}
+
+	/**
+	 * Creates a {@link ZooKeeperLeaderElectionService} instance.
+	 *
+	 * @param client        The {@link CuratorFramework} ZooKeeper client to use
+	 * @param configuration {@link Configuration} object containing the configuration values
+	 * @return {@link ZooKeeperLeaderElectionService} instance.
+	 * @throws Exception
+	 */
+	public static ZooKeeperLeaderElectionService createLeaderElectionService(
+			CuratorFramework client,
+			Configuration configuration) throws Exception {
+
 		String latchPath = configuration.getString(ConfigConstants.ZOOKEEPER_LATCH_PATH,
 				ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH);
 		String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
@@ -153,6 +176,89 @@ public final class ZooKeeperUtils {
 	}
 
 	/**
+	 * Creates a {@link ZooKeeperSubmittedJobGraphStore} instance.
+	 *
+	 * @param client        The {@link CuratorFramework} ZooKeeper client to use
+	 * @param configuration {@link Configuration} object
+	 * @return {@link ZooKeeperSubmittedJobGraphStore} instance
+	 */
+	public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
+			CuratorFramework client,
+			Configuration configuration) throws Exception {
+
+		checkNotNull(configuration, "Configuration");
+
+		StateHandleProvider<SubmittedJobGraph> stateHandleProvider =
+				StateHandleProviderFactory.createRecoveryFileStateHandleProvider(configuration);
+
+		// ZooKeeper submitted jobs root dir
+		String zooKeeperSubmittedJobsPath = configuration.getString(
+				ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
+				ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
+
+		return new ZooKeeperSubmittedJobGraphStore(
+				client, zooKeeperSubmittedJobsPath, stateHandleProvider);
+	}
+
+	/**
+	 * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
+	 *
+	 * @param client                         The {@link CuratorFramework} ZooKeeper client to use
+	 * @param configuration                  {@link Configuration} object
+	 * @param jobId                          ID of job to create the instance for
+	 * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain
+	 * @param userClassLoader                User code class loader
+	 * @return {@link ZooKeeperCompletedCheckpointStore} instance
+	 */
+	public static CompletedCheckpointStore createCompletedCheckpoints(
+			CuratorFramework client,
+			Configuration configuration,
+			JobID jobId,
+			int maxNumberOfCheckpointsToRetain,
+			ClassLoader userClassLoader) throws Exception {
+
+		checkNotNull(configuration, "Configuration");
+
+		StateHandleProvider<CompletedCheckpoint> stateHandleProvider =
+				StateHandleProviderFactory.createRecoveryFileStateHandleProvider(configuration);
+
+		String completedCheckpointsPath = configuration.getString(
+				ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH,
+				ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
+
+		completedCheckpointsPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
+
+		return new ZooKeeperCompletedCheckpointStore(
+				maxNumberOfCheckpointsToRetain,
+				userClassLoader,
+				client,
+				completedCheckpointsPath,
+				stateHandleProvider);
+	}
+
+	/**
+	 * Creates a {@link ZooKeeperCheckpointIDCounter} instance.
+	 *
+	 * @param client        The {@link CuratorFramework} ZooKeeper client to use
+	 * @param configuration {@link Configuration} object
+	 * @param jobId         ID of job to create the instance for
+	 * @return {@link ZooKeeperCheckpointIDCounter} instance
+	 */
+	public static ZooKeeperCheckpointIDCounter createCheckpointIDCounter(
+			CuratorFramework client,
+			Configuration configuration,
+			JobID jobId) throws Exception {
+
+		String checkpointIdCounterPath = configuration.getString(
+				ConfigConstants.ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
+				ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
+
+		checkpointIdCounterPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
+
+		return new ZooKeeperCheckpointIDCounter(client, checkpointIdCounterPath);
+	}
+
+	/**
 	 * Private constructor to prevent instantiation.
 	 */
 	private ZooKeeperUtils() {

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
new file mode 100644
index 0000000..936fe1b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * State handles backed by ZooKeeper.
+ *
+ * <p>Added state is persisted via {@link StateHandle}s, which in turn are written to
+ * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper
+ * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs.
+ *
+ * <p>State modifications require some care, because it is possible that certain failures bring
+ * the state handle backend and ZooKeeper out of sync.
+ *
+ * <p>ZooKeeper holds the ground truth about state handles, i.e. the following holds:
+ *
+ * <pre>
+ * State handle in ZooKeeper => State handle exists
+ * </pre>
+ *
+ * But not:
+ *
+ * <pre>
+ * State handle exists => State handle in ZooKeeper
+ * </pre>
+ *
+ * There can be lingering state handles when failures happen during operation. They
+ * need to be cleaned up manually (see <a href="https://issues.apache.org/jira/browse/FLINK-2513">
+ * FLINK-2513</a> about a possible way to overcome this).
+ *
+ * @param <T> Type of state
+ */
+public class ZooKeeperStateHandleStore<T extends Serializable> {
+
+	/** Curator ZooKeeper client */
+	private final CuratorFramework client;
+
+	/** State handle provider */
+	private final StateHandleProvider<T> stateHandleProvider;
+
+	/**
+	 * Creates a {@link ZooKeeperStateHandleStore}.
+	 *
+	 * @param client              The Curator ZooKeeper client. <strong>Important:</strong> It is
+	 *                            expected that the client's namespace ensures that the root
+	 *                            path is exclusive for all state handles managed by this
+	 *                            instance, e.g. <code>client.usingNamespace("/stateHandles")</code>
+	 * @param stateHandleProvider The state handle provider for the state
+	 */
+	public ZooKeeperStateHandleStore(
+			CuratorFramework client,
+			StateHandleProvider<T> stateHandleProvider) {
+
+		this.client = checkNotNull(client, "Curator client");
+		this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider");
+	}
+
+	/**
+	 * Creates a state handle and stores it in ZooKeeper with create mode {@link
+	 * CreateMode#PERSISTENT}.
+	 *
+	 * @see #add(String, Serializable, CreateMode)
+	 */
+	public StateHandle<T> add(String pathInZooKeeper, T state) throws Exception {
+		return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
+	}
+
+	/**
+	 * Creates a state handle and stores it in ZooKeeper.
+	 *
+	 * <p><strong>Important</strong>: This will <em>not</em> store the actual state in
+	 * ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection
+	 * makes sure that data in ZooKeeper is small.
+	 *
+	 * @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and
+	 *                        start with a '/')
+	 * @param state           State to be added
+	 * @param createMode      The create mode for the new path in ZooKeeper
+	 * @return Created {@link StateHandle}
+	 * @throws Exception If a ZooKeeper or state handle operation fails
+	 */
+	public StateHandle<T> add(String pathInZooKeeper, T state, CreateMode createMode) throws Exception {
+		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+		checkNotNull(state, "State");
+
+		// Create the state handle. Nothing persisted yet.
+		StateHandle<T> stateHandle = stateHandleProvider.createStateHandle(state);
+
+		boolean success = false;
+
+		try {
+			// Serialize the state handle. This writes the state to the backend.
+			byte[] serializedStateHandle = InstantiationUtil.serializeObject(stateHandle);
+
+			// Write state handle (not the actual state) to ZooKeeper. This is expected to be
+			// smaller than the state itself. This level of indirection makes sure that data in
+			// ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
+			// the state can be larger.
+			client.create().withMode(createMode).forPath(pathInZooKeeper, serializedStateHandle);
+
+			success = true;
+
+			return stateHandle;
+		}
+		finally {
+			if (!success) {
+				// Cleanup the state handle if it was not written to ZooKeeper.
+				if (stateHandle != null) {
+					stateHandle.discardState();
+				}
+			}
+		}
+	}
+
+	/**
+	 * Replaces a state handle in ZooKeeper and discards the old state handle.
+	 *
+	 * @param pathInZooKeeper Destination path in ZooKeeper (expected to exist and start with a '/')
+	 * @param expectedVersion Expected version of the node to replace
+	 * @param state           The new state to replace the old one
+	 * @throws Exception If a ZooKeeper or state handle operation fails
+	 */
+	public void replace(String pathInZooKeeper, int expectedVersion, T state) throws Exception {
+		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+		checkNotNull(state, "State");
+
+		StateHandle<T> oldStateHandle = get(pathInZooKeeper);
+
+		StateHandle<T> stateHandle = stateHandleProvider.createStateHandle(state);
+
+		boolean success = false;
+
+		try {
+			// Serialize the new state handle. This writes the state to the backend.
+			byte[] serializedStateHandle = InstantiationUtil.serializeObject(stateHandle);
+
+			// Replace state handle in ZooKeeper.
+			client.setData()
+					.withVersion(expectedVersion)
+					.forPath(pathInZooKeeper, serializedStateHandle);
+
+			success = true;
+		}
+		finally {
+			if (success) {
+				oldStateHandle.discardState();
+			}
+			else {
+				stateHandle.discardState();
+			}
+		}
+	}
+
+	/**
+	 * Returns the version of the node if it exists or <code>-1</code> if it doesn't.
+	 *
+	 * @param pathInZooKeeper Path in ZooKeeper to check
+	 * @return Version of the ZNode if the path exists, <code>-1</code> otherwise.
+	 * @throws Exception If the ZooKeeper operation fails
+	 */
+	public int exists(String pathInZooKeeper) throws Exception {
+		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+
+		Stat stat = client.checkExists().forPath(pathInZooKeeper);
+
+		if (stat != null) {
+			return stat.getVersion();
+		}
+
+		return -1;
+	}
+
+	/**
+	 * Gets a state handle from ZooKeeper.
+	 *
+	 * @param pathInZooKeeper Path in ZooKeeper to get the state handle from (expected to
+	 *                        exist and start with a '/').
+	 * @return The state handle
+	 * @throws Exception If a ZooKeeper or state handle operation fails
+	 */
+	@SuppressWarnings("unchecked")
+	public StateHandle<T> get(String pathInZooKeeper) throws Exception {
+		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+
+		byte[] data = client.getData().forPath(pathInZooKeeper);
+
+		return (StateHandle<T>) InstantiationUtil
+				.deserializeObject(data, ClassLoader.getSystemClassLoader());
+	}
+
+	/**
+	 * Gets all available state handles from ZooKeeper.
+	 *
+	 * <p>If there is a concurrent modification, the operation is retried until it succeeds.
+	 *
+	 * @return All state handles from ZooKeeper.
+	 * @throws Exception If a ZooKeeper or state handle operation fails
+	 */
+	@SuppressWarnings("unchecked")
+	public List<Tuple2<StateHandle<T>, String>> getAll() throws Exception {
+		final List<Tuple2<StateHandle<T>, String>> stateHandles = new ArrayList<>();
+
+		boolean success = false;
+
+		retry:
+		while (!success) {
+			// Initial cVersion (number of changes to the children of this node)
+			int initialCVersion = client.checkExists().forPath("/").getCversion();
+
+			List<String> children = client.getChildren().forPath("/");
+
+			for (String path : children) {
+				path = "/" + path;
+
+				try {
+					final StateHandle<T> stateHandle = get(path);
+					stateHandles.add(new Tuple2<>(stateHandle, path));
+				}
+				catch (KeeperException.NoNodeException ignored) {
+					// Concurrent deletion, retry
+					continue retry;
+				}
+			}
+
+			int finalCVersion = client.checkExists().forPath("/").getCversion();
+
+			// Check for concurrent modifications
+			success = initialCVersion == finalCVersion;
+		}
+
+		return stateHandles;
+	}
+
+	/**
+	 * Gets all available state handles from ZooKeeper sorted by name (ascending).
+	 *
+	 * <p>If there is a concurrent modification, the operation is retried until it succeeds.
+	 *
+	 * @return All state handles in ZooKeeper.
+	 * @throws Exception If a ZooKeeper or state handle operation fails
+	 */
+	@SuppressWarnings("unchecked")
+	public List<Tuple2<StateHandle<T>, String>> getAllSortedByName() throws Exception {
+		final List<Tuple2<StateHandle<T>, String>> stateHandles = new ArrayList<>();
+
+		boolean success = false;
+
+		retry:
+		while (!success) {
+			// Initial cVersion (number of changes to the children of this node)
+			int initialCVersion = client.checkExists().forPath("/").getCversion();
+
+			List<String> children = ZKPaths.getSortedChildren(
+					client.getZookeeperClient().getZooKeeper(),
+					ZKPaths.fixForNamespace(client.getNamespace(), "/"));
+
+			for (String path : children) {
+				path = "/" + path;
+
+				try {
+					final StateHandle<T> stateHandle = get(path);
+					stateHandles.add(new Tuple2<>(stateHandle, path));
+				}
+				catch (KeeperException.NoNodeException ignored) {
+					// Concurrent deletion, retry
+					continue retry;
+				}
+			}
+
+			int finalCVersion = client.checkExists().forPath("/").getCversion();
+
+			// Check for concurrent modifications
+			success = initialCVersion == finalCVersion;
+		}
+
+		return stateHandles;
+	}
+
+	/**
+	 * Removes a state handle from ZooKeeper.
+	 *
+	 * <p><stong>Important</stong>: this does not discard the state handle. If you want to
+	 * discard the state handle call {@link #removeAndDiscardState(String)}.
+	 *
+	 * @param pathInZooKeeper Path of state handle to remove (expected to start with a '/')
+	 * @throws Exception If the ZooKeeper operation fails
+	 */
+	public void remove(String pathInZooKeeper) throws Exception {
+		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+
+		client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper);
+	}
+
+	/**
+	 * Removes a state handle from ZooKeeper asynchronously.
+	 *
+	 * <p><stong>Important</stong>: this does not discard the state handle. If you want to
+	 * discard the state handle call {@link #removeAndDiscardState(String)}.
+	 *
+	 * @param pathInZooKeeper Path of state handle to remove (expected to start with a '/')
+	 * @param callback        The callback after the operation finishes
+	 * @throws Exception If the ZooKeeper operation fails
+	 */
+	public void remove(String pathInZooKeeper, BackgroundCallback callback) throws Exception {
+		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+		checkNotNull(callback, "Background callback");
+
+		client.delete().deletingChildrenIfNeeded().inBackground(callback).forPath(pathInZooKeeper);
+	}
+
+	/**
+	 * Discards a state handle and removes it from ZooKeeper.
+	 *
+	 * <p>If you only want to remove the state handle in ZooKeeper call {@link #remove(String)}.
+	 *
+	 * @param pathInZooKeeper Path of state handle to discard (expected to start with a '/')
+	 * @throws Exception If the ZooKeeper or state handle operation fails
+	 */
+	public void removeAndDiscardState(String pathInZooKeeper) throws Exception {
+		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+
+		StateHandle<T> stateHandle = get(pathInZooKeeper);
+
+		// Delete the state handle from ZooKeeper first
+		client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper);
+
+		// Discard the state handle only after it has been successfully deleted from ZooKeeper.
+		// Otherwise we might enter an illegal state after failures (with a state handle in
+		// ZooKeeper, which has already been discarded).
+		stateHandle.discardState();
+	}
+
+	/**
+	 * Discards all available state handles and removes them from ZooKeeper.
+	 *
+	 * @throws Exception If a ZooKeeper or state handle operation fails
+	 */
+	public void removeAndDiscardAllState() throws Exception {
+		final List<Tuple2<StateHandle<T>, String>> allStateHandles = getAll();
+
+		ZKPaths.deleteChildren(
+				client.getZookeeperClient().getZooKeeper(),
+				ZKPaths.fixForNamespace(client.getNamespace(), "/"),
+				false);
+
+		// Discard the state handles only after they have been successfully deleted from ZooKeeper.
+		for (Tuple2<StateHandle<T>, String> stateHandleAndPath : allStateHandles) {
+			stateHandleAndPath.f0.discardState();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
index 75ad20f..67d7a06 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.jobmanager
 
 import akka.actor.ActorRef
-
+import org.apache.flink.runtime.akka.ListeningBehaviour
 
 /**
  * Utility class to store job information on the [[JobManager]]. The JobInfo stores which actor
@@ -27,11 +27,20 @@ import akka.actor.ActorRef
  * Additionally, it stores whether the job was started in the detached mode. Detached means that
  * the submitting actor does not wait for the job result once the job has terminated.
  *
+ * Important: This class is serializable, but needs to be deserialized in the context of an actor
+ * system in order to resolve the client [[ActorRef]]. It is possible to serialize the Akka URL
+ * manually, but it is cumbersome and complicates testing in certain scenarios, where you need to
+ * make sure to resolve the correct [[ActorRef]]s when submitting jobs (RepointableActorRef vs.
+ * RemoteActorRef).
+ *
  * @param client Actor which submitted the job
  * @param start Starting time
  */
-class JobInfo(val client: ActorRef, val start: Long,
-              val sessionTimeout: Long) {
+class JobInfo(
+  val client: ActorRef,
+  val listeningBehaviour: ListeningBehaviour,
+  val start: Long,
+  val sessionTimeout: Long) extends Serializable {
 
   var sessionAlive = sessionTimeout > 0
 
@@ -49,12 +58,16 @@ class JobInfo(val client: ActorRef, val start: Long,
     }
   }
 
+  override def toString = s"JobInfo(client: $client ($listeningBehaviour), start: $start)"
+
   def setLastActive() =
     lastActive = System.currentTimeMillis()
 }
 
 object JobInfo{
-  def apply(client: ActorRef, start: Long,
-            sessionTimeout: Long) =
-    new JobInfo(client, start, sessionTimeout)
+  def apply(
+    client: ActorRef,
+    listeningBehaviour: ListeningBehaviour,
+    start: Long,
+    sessionTimeout: Long) = new JobInfo(client, listeningBehaviour, start, sessionTimeout)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 95637bb..f3e4054 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -19,31 +19,39 @@
 package org.apache.flink.runtime.jobmanager
 
 import java.io.{File, IOException}
-import java.lang.reflect.{InvocationTargetException, Constructor}
+import java.lang.reflect.{Constructor, InvocationTargetException}
 import java.net.InetSocketAddress
 import java.util.UUID
 
 import akka.actor.Status.Failure
-import akka.actor.{Props, Terminated, PoisonPill, ActorRef, ActorSystem}
+import akka.actor._
 import akka.pattern.ask
-
 import grizzled.slf4j.Logger
-
 import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
+import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.blob.BlobServer
+import org.apache.flink.runtime.checkpoint.{CheckpointRecoveryFactory, StandaloneCheckpointRecoveryFactory, ZooKeeperCheckpointRecoveryFactory}
 import org.apache.flink.runtime.client._
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex}
+import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
-import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService}
+import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
+import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
+import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService, StandaloneLeaderElectionService}
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
+import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import org.apache.flink.runtime.messages.RegistrationMessages._
+import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace}
 import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
-import org.apache.flink.runtime.messages.accumulators._
+import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, AccumulatorResultStringsFound, AccumulatorResultsErroneous, AccumulatorResultsFound, RequestAccumulatorResults, RequestAccumulatorResultsStringified}
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint}
 import org.apache.flink.runtime.messages.webmonitor._
 import org.apache.flink.runtime.process.ProcessReaper
@@ -51,25 +59,16 @@ import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
-import org.apache.flink.runtime.webmonitor.{WebMonitorUtils, WebMonitor}
-import org.apache.flink.runtime.{FlinkActor, StreamingMode, LeaderSessionMessageFilter}
-import org.apache.flink.runtime.LogMessages
-import org.apache.flink.runtime.akka.{ListeningBehaviour, AkkaUtils}
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
-import org.apache.flink.runtime.jobgraph.{JobVertexID, JobGraph, JobStatus}
-import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
-import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.messages.RegistrationMessages._
-import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat}
-import org.apache.flink.util.{NetUtils, SerializedValue, ExceptionUtils, InstantiationUtil}
+import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
+import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages, StreamingMode}
+import org.apache.flink.util.{ExceptionUtils, InstantiationUtil, NetUtils}
 
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.language.postfixOps
-import scala.collection.JavaConverters._
-import scala.concurrent.ExecutionContext.Implicits.global
 
 
 /**
@@ -110,17 +109,22 @@ class JobManager(
     protected val delayBetweenRetries: Long,
     protected val timeout: FiniteDuration,
     protected val mode: StreamingMode,
-    protected val leaderElectionService: LeaderElectionService)
+    protected val leaderElectionService: LeaderElectionService,
+    protected val submittedJobGraphs : SubmittedJobGraphStore,
+    protected val checkpointRecoveryFactory : CheckpointRecoveryFactory)
   extends FlinkActor 
   with LeaderSessionMessageFilter // mixin oder is important, we want filtering after logging
   with LogMessages // mixin order is important, we want first logging
-  with LeaderContender {
+  with LeaderContender
+  with SubmittedJobGraphListener {
 
   override val log = Logger(getClass)
 
   /** Either running or not yet archived jobs (session hasn't been ended). */
   protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
 
+  protected val recoveryMode = RecoveryMode.fromConfig(flinkConfiguration)
+
   var leaderSessionID: Option[UUID] = None
 
   /**
@@ -138,6 +142,22 @@ class JobManager(
           "start.", e)
         throw new RuntimeException("Could not start the leader election service.", e)
     }
+
+    try {
+      submittedJobGraphs.start(this)
+    } catch {
+      case e: Exception =>
+        log.error("Could not start the submitted job graphs service.", e)
+        throw new RuntimeException("Could not start the submitted job graphs service.", e)
+    }
+
+    try {
+      checkpointRecoveryFactory.start()
+    } catch {
+      case e: Exception =>
+        log.error("Could not start the checkpoint recovery service.", e)
+        throw new RuntimeException("Could not start the checkpoint recovery service.", e)
+    }
   }
 
   override def postStop(): Unit = {
@@ -159,6 +179,18 @@ class JobManager(
       case e: Exception => log.error("Could not properly shutdown the leader election service.")
     }
 
+    try {
+      submittedJobGraphs.stop()
+    } catch {
+      case e: Exception => log.error("Could not properly stop the submitted job graphs service.")
+    }
+
+    try {
+      checkpointRecoveryFactory.stop()
+    } catch {
+      case e: Exception => log.error("Could not properly stop the checkpoint recovery service.")
+    }
+
     if (archive != ActorRef.noSender) {
       archive ! decorateMessage(PoisonPill)
     }
@@ -191,12 +223,21 @@ class JobManager(
       // confirming the leader session ID might be blocking, thus do it in a future
       future{
         leaderElectionService.confirmLeaderSessionID(newLeaderSessionID.orNull)
+
+        // TODO (critical next step) This needs to be more flexible and robust (e.g. wait for task
+        // managers etc.)
+        if (recoveryMode != RecoveryMode.STANDALONE) {
+          context.system.scheduler.scheduleOnce(new FiniteDuration(delayBetweenRetries,
+            MILLISECONDS), self, decorateMessage(RecoverAllJobs))(context.dispatcher)
+        }
       }(context.dispatcher)
 
     case RevokeLeadership =>
       log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.")
 
-      cancelAndClearEverything(new Exception("JobManager is no longer the leader."))
+      future {
+        cancelAndClearEverything(new Exception("JobManager is no longer the leader."))
+      }(context.dispatcher)
 
       // disconnect the registered task managers
       instanceManager.getAllRegisteredInstances.asScala.foreach {
@@ -269,7 +310,62 @@ class JobManager(
       sender ! decorateMessage(instanceManager.getTotalNumberOfSlots)
 
     case SubmitJob(jobGraph, listeningBehaviour) =>
-      submitJob(jobGraph, listeningBehaviour)
+      val client = sender()
+
+      val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(),
+        jobGraph.getSessionTimeout)
+
+      future {
+        submitJob(jobGraph, jobInfo)
+      }(context.dispatcher)
+
+    case RecoverJob(jobId) =>
+      future {
+        // The ActorRef, which is part of the submitted job graph can only be deserialized in the
+        // scope of an actor system.
+        akka.serialization.JavaSerializer.currentSystem.withValue(
+          context.system.asInstanceOf[ExtendedActorSystem]) {
+
+          log.info(s"Attempting to recover job $jobId.")
+
+          val jobGraph = submittedJobGraphs.recoverJobGraph(jobId)
+
+          if (jobGraph.isDefined) {
+            if (!leaderElectionService.hasLeadership()) {
+              // we've lost leadership. mission: abort.
+              log.warn(s"Lost leadership during recovery. Aborting recovery of $jobId.")
+            }
+            else {
+              recoverJobGraph(jobGraph.get)
+            }
+          }
+          else {
+            log.warn(s"Failed to recover job graph ${jobId}.")
+          }
+        }
+      }(context.dispatcher)
+
+    case RecoverAllJobs =>
+      future {
+        // The ActorRef, which is part of the submitted job graph can only be deserialized in the
+        // scope of an actor system.
+        akka.serialization.JavaSerializer.currentSystem.withValue(
+          context.system.asInstanceOf[ExtendedActorSystem]) {
+
+          val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala
+
+          if (!leaderElectionService.hasLeadership()) {
+            // we've lost leadership. mission: abort.
+            log.warn(s"Lost leadership during recovery. Aborting recovery of ${jobGraphs.size} " +
+              s"jobs.")
+          }
+          else {
+            log.debug(s"Attempting to recover ${jobGraphs.size} job graphs.")
+
+            jobGraphs.foreach(recoverJobGraph(_))
+          }
+        }
+      }(context.dispatcher)
 
     case CancelJob(jobID) =>
       log.info(s"Trying to cancel job with ID $jobID.")
@@ -377,10 +473,27 @@ class JobManager(
           if (newJobStatus.isTerminalState()) {
             jobInfo.end = timeStamp
 
-            // is the client waiting for the job result?
-            if (jobInfo.client != ActorRef.noSender) {
-              newJobStatus match {
-                case JobStatus.FINISHED =>
+            future {
+              // TODO If removing the JobGraph from the SubmittedJobGraphsStore fails, the job will
+              // linger around and potentially be recovered at a later time. There is nothing we
+              // can do about that, but it should be communicated with the Client.
+              if (jobInfo.sessionAlive) {
+                jobInfo.setLastActive()
+                val lastActivity = jobInfo.lastActive
+                context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
+                  // remove only if no activity occurred in the meantime
+                  if (lastActivity == jobInfo.lastActive) {
+                    removeJob(jobID)
+                  }
+                }
+              } else {
+                removeJob(jobID)
+              }
+
+              // is the client waiting for the job result?
+              if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) {
+                newJobStatus match {
+                  case JobStatus.FINISHED =>
                   try {
                     val accumulatorResults = executionGraph.getAccumulatorsSerialized()
                     val result = new SerializedJobExecutionResult(
@@ -398,47 +511,37 @@ class JobManager(
                       jobInfo.client ! decorateMessage(JobResultFailure(
                         new SerializedThrowable(exception)))
                   }
-                case JobStatus.CANCELED =>
-                  // the error may be packed as a serialized throwable
-                  val unpackedError = SerializedThrowable.get(
-                    error, executionGraph.getUserClassLoader())
-
-                  jobInfo.client ! decorateMessage(JobResultFailure(
-                    new SerializedThrowable(
-                      new JobCancellationException(jobID, "Job was cancelled.", unpackedError))))
-
-                case JobStatus.FAILED =>
-                  val unpackedError = SerializedThrowable.get(
-                    error, executionGraph.getUserClassLoader())
-
-                  jobInfo.client ! decorateMessage(JobResultFailure(
-                    new SerializedThrowable(
-                      new JobExecutionException(jobID, "Job execution failed.", unpackedError))))
-
-                case x =>
-                  val exception = new JobExecutionException(jobID, s"$x is not a terminal state.")
-                  jobInfo.client ! decorateMessage(JobResultFailure(
-                    new SerializedThrowable(exception)))
-                  throw exception
-              }
-            }
 
-            if (jobInfo.sessionAlive) {
-              jobInfo.setLastActive()
-              val lastActivity = jobInfo.lastActive
-              context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
-                // remove only if no activity occurred in the meantime
-                if (lastActivity == jobInfo.lastActive) {
-                  removeJob(jobID)
+                  case JobStatus.CANCELED =>
+                    // the error may be packed as a serialized throwable
+                    val unpackedError = SerializedThrowable.get(
+                      error, executionGraph.getUserClassLoader())
+
+                    jobInfo.client ! decorateMessage(JobResultFailure(
+                      new SerializedThrowable(
+                        new JobCancellationException(jobID, "Job was cancelled.", unpackedError))))
+
+                  case JobStatus.FAILED =>
+                    val unpackedError = SerializedThrowable.get(
+                      error, executionGraph.getUserClassLoader())
+
+                    jobInfo.client ! decorateMessage(JobResultFailure(
+                      new SerializedThrowable(
+                        new JobExecutionException(jobID, "Job execution failed.", unpackedError))))
+
+                  case x =>
+                    val exception = new JobExecutionException(jobID, s"$x is not a terminal state.")
+                    jobInfo.client ! decorateMessage(JobResultFailure(
+                      new SerializedThrowable(exception)))
+                    throw exception
                 }
               }
-            } else {
-              removeJob(jobID)
-            }
-
+            }(context.dispatcher)
           }
         case None =>
-          removeJob(jobID)
+          future {
+            removeJob(jobID)
+          }(context.dispatcher)
       }
 
     case ScheduleOrUpdateConsumers(jobId, partitionId) =>
@@ -600,11 +703,12 @@ class JobManager(
    * graph and the execution vertices are queued for scheduling.
    *
    * @param jobGraph representing the Flink job
-   * @param listeningBehaviour specifies the listening behaviour of the sender.
+   * @param jobInfo the job info
+   * @param isRecovery Flag indicating whether this is a recovery or initial submission
    */
-  private def submitJob(jobGraph: JobGraph, listeningBehaviour: ListeningBehaviour): Unit = {
+  private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = {
     if (jobGraph == null) {
-      sender() ! decorateMessage(JobResultFailure(
+      jobInfo.client ! decorateMessage(JobResultFailure(
         new SerializedThrowable(
           new JobSubmissionException(null, "JobGraph must not be null.")
         )
@@ -615,7 +719,7 @@ class JobManager(
       val jobName = jobGraph.getName
       var executionGraph: ExecutionGraph = null
 
-      log.info(s"Received job ${jobId} (${jobName}).")
+      log.info(s"Submitting job $jobId ($jobName)" + (if (isRecovery) " (Recovery)" else "") + ".")
 
       try {
         // Important: We need to make sure that the library registration is the first action,
@@ -628,7 +732,7 @@ class JobManager(
         catch {
           case t: Throwable =>
             throw new JobSubmissionException(jobId,
-            "Cannot set up the user code libraries: " + t.getMessage, t)
+              "Cannot set up the user code libraries: " + t.getMessage, t)
         }
 
         val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
@@ -641,18 +745,10 @@ class JobManager(
           throw new JobSubmissionException(jobId, "The given job is empty")
         }
 
-        val client = if(listeningBehaviour == ListeningBehaviour.DETACHED) {
-          // The client does not want to receive the SerializedJobExecutionResult
-          ActorRef.noSender
-        } else {
-          // Send the job execution result back to the sender
-          sender
-        }
-
         // see if there already exists an ExecutionGraph for the corresponding job ID
         executionGraph = currentJobs.get(jobGraph.getJobID) match {
-          case Some((graph, jobInfo)) =>
-            jobInfo.setLastActive()
+          case Some((graph, currentJobInfo)) =>
+            currentJobInfo.setLastActive()
             graph
           case None =>
             val graph = new ExecutionGraph(
@@ -664,11 +760,7 @@ class JobManager(
               jobGraph.getUserJarBlobKeys,
               jobGraph.getClasspaths,
               userCodeLoader)
-            val jobInfo = JobInfo(
-              client,
-              System.currentTimeMillis(),
-              jobGraph.getSessionTimeout)
-            currentJobs.put(jobGraph.getJobID, (graph, jobInfo))
+
             graph
         }
 
@@ -682,7 +774,7 @@ class JobManager(
         executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
         executionGraph.setScheduleMode(jobGraph.getScheduleMode())
         executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling())
-        
+
         try {
           executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph))
         }
@@ -691,7 +783,7 @@ class JobManager(
             log.warn("Cannot create JSON plan for job", t)
             executionGraph.setJsonPlan("{}")
         }
-        
+
         // initialize the vertices that have a master initialization hook
         // file output formats create directories here, input formats create splits
         if (log.isDebugEnabled) {
@@ -701,62 +793,67 @@ class JobManager(
         val numSlots = scheduler.getTotalNumberOfSlots()
 
         for (vertex <- jobGraph.getVertices.asScala) {
-
           val executableClass = vertex.getInvokableClassName
           if (executableClass == null || executableClass.length == 0) {
             throw new JobSubmissionException(jobId,
               s"The vertex ${vertex.getID} (${vertex.getName}) has no invokable class.")
           }
 
-              if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
-                vertex.setParallelism(numSlots)
-              }
+          if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
+            vertex.setParallelism(numSlots)
+          }
 
-              try {
-                vertex.initializeOnMaster(userCodeLoader)
-              }
-              catch {
+          try {
+            vertex.initializeOnMaster(userCodeLoader)
+          }
+          catch {
             case t: Throwable =>
               throw new JobExecutionException(jobId,
                 "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage, t)
-              }
-            }
+          }
+        }
 
-            // topologically sort the job vertices and attach the graph to the existing one
-            val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources()
-            if (log.isDebugEnabled) {
-              log.debug(s"Adding ${sortedTopology.size()} vertices from " +
-                s"job graph ${jobId} (${jobName}).")
-            }
-            executionGraph.attachJobGraph(sortedTopology)
+        // topologically sort the job vertices and attach the graph to the existing one
+        val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources()
+        if (log.isDebugEnabled) {
+          log.debug(s"Adding ${sortedTopology.size()} vertices from " +
+            s"job graph ${jobId} (${jobName}).")
+        }
+        executionGraph.attachJobGraph(sortedTopology)
 
-            if (log.isDebugEnabled) {
-              log.debug("Successfully created execution graph from job " +
-                s"graph ${jobId} (${jobName}).")
-            }
+        if (log.isDebugEnabled) {
+          log.debug("Successfully created execution graph from job " +
+            s"graph ${jobId} (${jobName}).")
+        }
 
-            // configure the state checkpointing
-            val snapshotSettings = jobGraph.getSnapshotSettings
-            if (snapshotSettings != null) {
+        // configure the state checkpointing
+        val snapshotSettings = jobGraph.getSnapshotSettings
+        if (snapshotSettings != null) {
+          val jobId = jobGraph.getJobID()
 
-              val idToVertex: JobVertexID => ExecutionJobVertex = id => {
-                val vertex = executionGraph.getJobVertex(id)
-                if (vertex == null) {
-                  throw new JobSubmissionException(jobId,
-                    "The snapshot checkpointing settings refer to non-existent vertex " + id)
-                }
-                vertex
-              }
+          val idToVertex: JobVertexID => ExecutionJobVertex = id => {
+            val vertex = executionGraph.getJobVertex(id)
+            if (vertex == null) {
+              throw new JobSubmissionException(jobId,
+                "The snapshot checkpointing settings refer to non-existent vertex " + id)
+            }
+            vertex
+          }
 
-              val triggerVertices: java.util.List[ExecutionJobVertex] =
+          val triggerVertices: java.util.List[ExecutionJobVertex] =
             snapshotSettings.getVerticesToTrigger().asScala.map(idToVertex).asJava
 
-              val ackVertices: java.util.List[ExecutionJobVertex] =
+          val ackVertices: java.util.List[ExecutionJobVertex] =
             snapshotSettings.getVerticesToAcknowledge().asScala.map(idToVertex).asJava
 
-              val confirmVertices: java.util.List[ExecutionJobVertex] =
+          val confirmVertices: java.util.List[ExecutionJobVertex] =
             snapshotSettings.getVerticesToConfirm().asScala.map(idToVertex).asJava
 
+          val completedCheckpoints = checkpointRecoveryFactory
+            .createCompletedCheckpoints(jobId, userCodeLoader)
+
+          val checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(jobId)
+
           executionGraph.enableSnapshotCheckpointing(
             snapshotSettings.getCheckpointInterval,
             snapshotSettings.getCheckpointTimeout,
@@ -764,23 +861,39 @@ class JobManager(
             ackVertices,
             confirmVertices,
             context.system,
-            leaderSessionID.orNull)
+            leaderSessionID.orNull,
+            checkpointIdCounter,
+            completedCheckpoints,
+            recoveryMode)
         }
 
         // get notified about job status changes
         executionGraph.registerJobStatusListener(
           new AkkaActorGateway(self, leaderSessionID.orNull))
 
-        if (listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {
+        if (jobInfo.listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {
           // the sender wants to be notified about state changes
-          val gateway = new AkkaActorGateway(sender(), leaderSessionID.orNull)
+          val gateway = new AkkaActorGateway(jobInfo.client, leaderSessionID.orNull)
 
           executionGraph.registerExecutionListener(gateway)
           executionGraph.registerJobStatusListener(gateway)
         }
 
+        if (isRecovery) {
+          executionGraph.restoreLatestCheckpointedState()
+        }
+        else {
+          submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo))
+        }
+
+        // Add the job graph only after everything is finished. Otherwise there can be races in
+        // tests, which check the currentJobs (for example before killing a JM).
+        if (!currentJobs.contains(jobId)) {
+          currentJobs.put(jobGraph.getJobID, (executionGraph, jobInfo))
+        }
+
         // done with submitting the job
-        sender() ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
+        jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
       }
       catch {
         case t: Throwable =>
@@ -799,33 +912,61 @@ class JobManager(
             new JobExecutionException(jobId, s"Failed to submit job ${jobId} (${jobName})", t)
           }
 
-          sender() ! decorateMessage(JobResultFailure(new SerializedThrowable(rt)))
+          jobInfo.client ! decorateMessage(JobResultFailure(new SerializedThrowable(rt)))
           return
       }
 
-      // NOTE: Scheduling the job for execution is a separate action from the job submission.
-      // The success of submitting the job must be independent from the success of scheduling
-      // the job.
-      try {
-        log.info(s"Scheduling job ${executionGraph.getJobName}.")
-        executionGraph.scheduleForExecution(scheduler)
-      }
-      catch {
-        case t: Throwable => try {
-          executionGraph.fail(t)
+      if (leaderElectionService.hasLeadership) {
+        // There is a small chance that multiple job managers schedule the same job after if they
+        // try to recover at the same time. This will eventually be noticed, but can not be ruled
+        // out from the beginning.
+
+        // NOTE: Scheduling the job for execution is a separate action from the job submission.
+        // The success of submitting the job must be independent from the success of scheduling
+        // the job.
+        try {
+          log.info(s"Scheduling job $jobId ($jobName).")
+
+          executionGraph.scheduleForExecution(scheduler)
         }
         catch {
-          case tt: Throwable => {
-            log.error("Error while marking ExecutionGraph as failed.", tt)
+          case t: Throwable => try {
+            executionGraph.fail(t)
+          }
+          catch {
+            case tt: Throwable => {
+              log.error("Error while marking ExecutionGraph as failed.", tt)
+            }
           }
         }
       }
+      else {
+        // Remove the job graph. Otherwise it will be lingering around and possibly removed from
+        // ZooKeeper by this JM.
+        currentJobs.remove(jobId)
+
+        log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " +
+          "this. I am not scheduling the job for execution.")
+      }
+    }
+  }
+
+  /**
+   * Submits the job if it is not already one of our current jobs.
+   *
+   * @param jobGraph Job to recover
+   */
+  private def recoverJobGraph(jobGraph: SubmittedJobGraph): Unit = {
+    if (!currentJobs.contains(jobGraph.getJobId)) {
+      future {
+        submitJob(jobGraph.getJobGraph(), jobGraph.getJobInfo(), isRecovery = true)
+      }(context.dispatcher)
     }
   }
 
   /**
    * Dedicated handler for checkpoint messages.
-   * 
+   *
    * @param actorMessage The checkpoint actor message.
    */
   private def handleCheckpointMessage(actorMessage: AbstractCheckpointMessage): Unit = {
@@ -836,13 +977,15 @@ class JobManager(
           case Some((graph, _)) =>
             val coordinator = graph.getCheckpointCoordinator()
             if (coordinator != null) {
-              try {
-                coordinator.receiveAcknowledgeMessage(ackMessage)
-              }
-              catch {
-                case t: Throwable =>
-                  log.error(s"Error in CheckpointCoordinator while processing $ackMessage", t)
-              }
+              future {
+                try {
+                  coordinator.receiveAcknowledgeMessage(ackMessage)
+                }
+                catch {
+                  case t: Throwable =>
+                    log.error(s"Error in CheckpointCoordinator while processing $ackMessage", t)
+                }
+              }(context.dispatcher)
             }
             else {
               log.error(
@@ -1020,30 +1163,46 @@ class JobManager(
   }
 
   /**
-   * Removes the job and sends it to the MemoryArchivist
+   * Removes the job and sends it to the MemoryArchivist.
+   *
+   * This should be called asynchronously. Removing the job from the [[SubmittedJobGraphStore]]
+   * might block. Therefore be careful not to block the actor thread.
+   *
    * @param jobID ID of the job to remove and archive
    */
   private def removeJob(jobID: JobID): Unit = {
     currentJobs.synchronized {
-      currentJobs.remove(jobID) match {
+      // Don't remove the job yet...
+      currentJobs.get(jobID) match {
         case Some((eg, _)) =>
           try {
+            // ...otherwise, we can have lingering resources when there is a  concurrent shutdown
+            // and the ZooKeeper client is closed. Not removing the job immediately allow the
+            // shutdown to release all resources.
+            submittedJobGraphs.removeJobGraph(jobID)
+          } catch {
+            case t: Throwable => log.error(s"Could not remove submitted job graph $jobID.", t)
+          }
+
+          try {
             eg.prepareForArchiving()
+
             archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg))
           } catch {
             case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " +
               "archiving.", t)
           }
 
+          currentJobs.remove(jobID)
         case None =>
       }
+    }
 
-      try {
-        libraryCacheManager.unregisterJob(jobID)
-      } catch {
-        case t: Throwable =>
-          log.error(s"Could not properly unregister job $jobID form the library cache.", t)
-      }
+    try {
+      libraryCacheManager.unregisterJob(jobID)
+    } catch {
+      case t: Throwable =>
+        log.error(s"Could not properly unregister job $jobID form the library cache.", t)
     }
   }
 
@@ -1053,17 +1212,21 @@ class JobManager(
     * @param cause Cause for the cancelling.
     */
   private def cancelAndClearEverything(cause: Throwable) {
-    for((jobID, (eg, jobInfo)) <- currentJobs) {
+    for ((jobID, (eg, jobInfo)) <- currentJobs) {
+      try {
+        submittedJobGraphs.removeJobGraph(jobID)
+      }
+      catch {
+        case t: Throwable => {
+          log.error("Error during submitted job graph clean up.", t)
+        }
+      }
+
       eg.fail(cause)
 
-      if(jobInfo.client != ActorRef.noSender) {
+      if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) {
         jobInfo.client ! decorateMessage(
-          Failure(
-            new JobExecutionException(
-              jobID,
-              "All jobs are cancelled and cleared.",
-              cause)
-          ))
+          Failure(new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause)))
       }
     }
 
@@ -1079,6 +1242,25 @@ class JobManager(
     self ! decorateMessage(RevokeLeadership)
   }
 
+  override def onAddedJobGraph(jobId: JobID): Unit = {
+    if (leaderSessionID.isDefined && !currentJobs.contains(jobId)) {
+      self ! decorateMessage(RecoverJob(jobId))
+    }
+  }
+
+  override def onRemovedJobGraph(jobId: JobID): Unit = {
+    if (leaderSessionID.isDefined) {
+      currentJobs.get(jobId).foreach(
+        job =>
+          future {
+            // Fail the execution graph
+            job._1.fail(new IllegalStateException("Another JobManager removed the job from " +
+              "ZooKeeper."))
+          }(context.dispatcher)
+      )
+    }
+  }
+
   override def getAddress: String = {
     AkkaUtils.getAkkaURL(context.system, self)
   }
@@ -1166,7 +1348,7 @@ object JobManager {
       System.exit(STARTUP_FAILURE_RETURN_CODE)
     }
 
-    if (ZooKeeperUtils.isZooKeeperHighAvailabilityEnabled(configuration)) {
+    if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) {
       // address and will not be reachable from anyone remote
       if (listeningPort != 0) {
         val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
@@ -1227,7 +1409,7 @@ object JobManager {
    *
    * @param configuration The configuration object for the JobManager.
    * @param executionMode The execution mode in which to run. Execution mode LOCAL will spawn an
-   *                      additional TaskManager in the same process.
+   *                      an additional TaskManager in the same process.
    * @param streamingMode The streaming mode to run the system in (streaming vs. batch-only)
    * @param listeningAddress The hostname where the JobManager should listen for messages.
    * @param listeningPort The port where the JobManager should listen for messages.
@@ -1480,7 +1662,7 @@ object JobManager {
 
     // high availability mode
     val port: Int =
-      if (ZooKeeperUtils.isZooKeeperHighAvailabilityEnabled(configuration)) {
+      if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) {
         LOG.info("Starting JobManager in High-Availability Mode")
 
         configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
@@ -1524,7 +1706,9 @@ object JobManager {
     Long, // delay between retries
     FiniteDuration, // timeout
     Int, // number of archived jobs
-    LeaderElectionService) = {
+    LeaderElectionService,
+    SubmittedJobGraphStore,
+    CheckpointRecoveryFactory) = {
 
     val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
 
@@ -1588,10 +1772,31 @@ object JobManager {
       }
     }
 
-    val leaderElectionService = leaderElectionServiceOption match {
-      case Some(les) => les
-      case None => LeaderElectionUtils.createLeaderElectionService(configuration)
-    }
+    // Create recovery related components
+    val (leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory) =
+      RecoveryMode.fromConfig(configuration) match {
+        case RecoveryMode.STANDALONE =>
+          val leaderElectionService = leaderElectionServiceOption match {
+            case Some(les) => les
+            case None => new StandaloneLeaderElectionService()
+          }
+
+          (leaderElectionService,
+            new StandaloneSubmittedJobGraphStore(),
+            new StandaloneCheckpointRecoveryFactory())
+
+        case RecoveryMode.ZOOKEEPER =>
+          val client = ZooKeeperUtils.startCuratorFramework(configuration)
+
+          val leaderElectionService = leaderElectionServiceOption match {
+            case Some(les) => les
+            case None => ZooKeeperUtils.createLeaderElectionService(client, configuration)
+          }
+
+          (leaderElectionService,
+            ZooKeeperUtils.createSubmittedJobGraphs(client, configuration),
+            new ZooKeeperCheckpointRecoveryFactory(client, configuration))
+      }
 
     (executionContext,
       instanceManager,
@@ -1599,9 +1804,11 @@ object JobManager {
       libraryCacheManager,
       executionRetries,
       delayBetweenRetries,
-      timeout, 
-      archiveCount, 
-      leaderElectionService)
+      timeout,
+      archiveCount,
+      leaderElectionService,
+      submittedJobGraphs,
+      checkpointRecoveryFactory)
   }
 
   /**
@@ -1633,6 +1840,7 @@ object JobManager {
       jobManagerClass,
       archiveClass)
   }
+
   /**
    * Starts the JobManager and job archiver based on the given configuration, in the
    * given actor system.
@@ -1646,28 +1854,30 @@ object JobManager {
    * @param streamingMode The mode to run the system in (streaming vs. batch-only)
    * @param jobManagerClass The class of the JobManager to be started
    * @param archiveClass The class of the MemoryArchivist to be started
-   * 
+   *
    * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
   def startJobManagerActors(
-      configuration: Configuration,
-      actorSystem: ActorSystem,
-      jobMangerActorName: Option[String],
-      archiveActorName: Option[String],
-      streamingMode: StreamingMode,
-      jobManagerClass: Class[_ <: JobManager],
-      archiveClass: Class[_ <: MemoryArchivist])
-    : (ActorRef, ActorRef) = {
+                             configuration: Configuration,
+                             actorSystem: ActorSystem,
+                             jobMangerActorName: Option[String],
+                             archiveActorName: Option[String],
+                             streamingMode: StreamingMode,
+                             jobManagerClass: Class[_ <: JobManager],
+                             archiveClass: Class[_ <: MemoryArchivist])
+  : (ActorRef, ActorRef) = {
 
     val (executionContext,
-      instanceManager,
-      scheduler,
-      libraryCacheManager,
-      executionRetries,
-      delayBetweenRetries,
-      timeout,
-      archiveCount,
-      leaderElectionService) = createJobManagerComponents(
+    instanceManager,
+    scheduler,
+    libraryCacheManager,
+    executionRetries,
+    delayBetweenRetries,
+    timeout,
+    archiveCount,
+    leaderElectionService,
+    submittedJobGraphs,
+    checkpointRecoveryFactory) = createJobManagerComponents(
       configuration,
       None)
 
@@ -1691,7 +1901,9 @@ object JobManager {
       delayBetweenRetries,
       timeout,
       streamingMode,
-      leaderElectionService)
+      leaderElectionService,
+      submittedJobGraphs,
+      checkpointRecoveryFactory)
 
     val jobManager: ActorRef = jobMangerActorName match {
       case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index c29df88..d776622 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -66,6 +66,18 @@ object JobManagerMessages {
     extends RequiresLeaderSessionID
 
   /**
+   * Triggers the recovery of the job with the given ID.
+   *
+   * @param jobId ID of the job to recover
+   */
+  case class RecoverJob(jobId: JobID) extends RequiresLeaderSessionID
+
+  /**
+   * Triggers recovery of all available jobs.
+   */
+  case class RecoverAllJobs() extends RequiresLeaderSessionID
+
+  /**
    * Cancels a job with the given [[jobID]] at the JobManager. The result of the cancellation is
    * sent back to the sender as a [[CancellationResponse]] message.
    *
@@ -354,6 +366,10 @@ object JobManagerMessages {
   // --------------------------------------------------------------------------
   // Utility methods to allow simpler case object access from Java
   // --------------------------------------------------------------------------
+
+  def getRequestJobStatus(jobId : JobID) : AnyRef = {
+    RequestJobStatus(jobId)
+  }
   
   def getRequestNumberRegisteredTaskManager : AnyRef = {
     RequestNumberRegisteredTaskManager

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 29add0e..2df3437 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -94,9 +94,7 @@ abstract class FlinkMiniCluster(
 
   implicit val timeout = AkkaUtils.getTimeout(userConfiguration)
 
-  val recoveryMode = RecoveryMode.valueOf(configuration.getString(
-    ConfigConstants.RECOVERY_MODE,
-    ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase)
+  val recoveryMode = RecoveryMode.fromConfig(configuration)
 
   val numJobManagers = getNumberOfJobManagers