You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:22 UTC

[03/52] [abbrv] flink git commit: [FLINK-4375] [distributed coordination] Implement new JobManager creation, initialization, and basic RPC methods

[FLINK-4375] [distributed coordination] Implement new JobManager creation, initialization, and basic RPC methods


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6486067
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6486067
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6486067

Branch: refs/heads/master
Commit: c64860677fb576f00c700a62a4ebb4ed415bebda
Parents: 4e5f423
Author: Kurt Young <yk...@gmail.com>
Authored: Wed Oct 12 23:25:16 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:22 2016 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/util/StringUtils.java |  14 +
 .../apache/flink/runtime/blob/BlobServer.java   |  65 +-
 .../apache/flink/runtime/blob/BlobStore.java    |   3 +-
 .../apache/flink/runtime/blob/BlobUtils.java    |   8 +-
 .../flink/runtime/blob/FileSystemBlobStore.java |  53 +-
 .../flink/runtime/blob/VoidBlobStore.java       |   2 +-
 .../HighAvailabilityServices.java               |  13 +
 .../runtime/highavailability/NonHaServices.java |  13 +
 .../highavailability/RunningJobsRegistry.java   |  66 ++
 .../highavailability/ZookeeperHaServices.java   | 104 ++-
 .../highavailability/nonha/NonHaRegistry.java   |  62 ++
 .../runtime/jobmanager/OnCompletionActions.java |   3 +-
 .../runtime/jobmaster/JobManagerRunner.java     | 269 ++++--
 .../runtime/jobmaster/JobManagerServices.java   |  86 +-
 .../flink/runtime/jobmaster/JobMaster.java      | 831 +++++++------------
 .../runtime/jobmaster/JobMasterGateway.java     | 112 +--
 .../jobmaster/MiniClusterJobDispatcher.java     |  61 +-
 .../jobmaster/message/ClassloadingProps.java    |   1 -
 .../message/DisposeSavepointResponse.java       |  49 --
 .../message/TriggerSavepointResponse.java       |  74 --
 .../apache/flink/runtime/rpc/RpcService.java    |   4 +-
 .../taskexecutor/JobManagerConnection.java      |  25 +-
 .../runtime/taskexecutor/TaskExecutor.java      | 103 ++-
 .../taskexecutor/rpc/RpcInputSplitProvider.java |   8 +-
 .../rpc/RpcPartitionStateChecker.java           |   8 +-
 .../RpcResultPartitionConsumableNotifier.java   |   7 +-
 .../apache/flink/runtime/akka/AkkaUtils.scala   |   6 +
 .../TestingHighAvailabilityServices.java        |  14 +
 .../jobmaster/JobManagerRunnerMockTest.java     |  56 +-
 .../runtime/jobmaster/JobManagerRunnerTest.java |  24 +
 30 files changed, 1184 insertions(+), 960 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index 10b6304..3c32d77 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -335,4 +335,18 @@ public final class StringUtils {
 			return null;
 		}
 	}
+
+	public static boolean isNullOrWhitespaceOnly(String str) {
+		if (str == null || str.length() == 0) {
+			return true;
+		}
+
+		final int len = str.length();
+		for (int i = 0; i < len; i++) {
+			if (!Character.isWhitespace(str.charAt(i))) {
+				return false;
+			}
+		}
+		return true;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index b800500..33f9db7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -22,7 +22,11 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.util.NetUtils;
@@ -45,6 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
 
 /**
  * This class implements the BLOB server. The BLOB server is responsible for listening for incoming requests and
@@ -96,9 +101,16 @@ public class BlobServer extends Thread implements BlobService {
 	 *         thrown if the BLOB server cannot bind to a free network port
 	 */
 	public BlobServer(Configuration config) throws IOException {
-		checkNotNull(config, "Configuration");
+		this(config, createBlobStoreFromConfig(config));
+	}
 
-		HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config);
+	public BlobServer(Configuration config, HighAvailabilityServices haServices) throws IOException {
+		this(config, haServices.createBlobStore());
+	}
+
+	private BlobServer(Configuration config, BlobStore blobStore) throws IOException {
+		checkNotNull(config);
+		this.blobStore = checkNotNull(blobStore);
 
 		this.blobServiceConfiguration = config;
 
@@ -107,14 +119,6 @@ public class BlobServer extends Thread implements BlobService {
 		this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
 		LOG.info("Created BLOB server storage directory {}", storageDir);
 
-		if (highAvailabilityMode == HighAvailabilityMode.NONE) {
-			this.blobStore = new VoidBlobStore();
-		} else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
-			this.blobStore = new FileSystemBlobStore(config);
-		} else {
-			throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + ".");
-		}
-
 		// configure the maximum number of concurrent connections
 		final int maxConnections = config.getInteger(
 				ConfigConstants.BLOB_FETCH_CONCURRENT_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT);
@@ -135,13 +139,7 @@ public class BlobServer extends Thread implements BlobService {
 			backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG;
 		}
 
-		if (highAvailabilityMode == HighAvailabilityMode.NONE) {
-			// Add shutdown hook to delete storage directory
-			this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
-		}
-		else {
-			this.shutdownHook = null;
-		}
+		this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
 
 		if (config.getBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED,
 				ConfigConstants.DEFAULT_BLOB_SERVICE_SSL_ENABLED)) {
@@ -451,4 +449,37 @@ public class BlobServer extends Thread implements BlobService {
 		}
 	}
 
+	private static BlobStore createBlobStoreFromConfig(Configuration config) throws IOException {
+		HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config);
+
+		if (highAvailabilityMode == HighAvailabilityMode.NONE) {
+		return new VoidBlobStore();
+		} else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
+			final String storagePath = config.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
+			if (isNullOrWhitespaceOnly(storagePath)) {
+				throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " +
+						HighAvailabilityOptions.HA_STORAGE_PATH);
+			}
+
+			final Path path;
+			try {
+				path = new Path(storagePath);
+			} catch (Exception e) {
+				throw new IOException("Invalid path for highly available storage (" +
+						HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+			}
+
+			final FileSystem fileSystem;
+			try {
+				fileSystem = path.getFileSystem();
+			} catch (Exception e) {
+				throw new IOException("Could not create FileSystem for highly available storage (" +
+						HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+			}
+
+			return new FileSystemBlobStore(fileSystem, storagePath);
+		} else {
+			throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + ".");
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
index 1e72d91..7050338 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
@@ -25,7 +25,7 @@ import java.io.File;
 /**
  * A blob store.
  */
-interface BlobStore {
+public interface BlobStore {
 
 	/**
 	 * Copies the local file to the blob store.
@@ -93,5 +93,4 @@ interface BlobStore {
 	 * Cleans up the store and deletes all blobs.
 	 */
 	void cleanUp();
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index e74fa6f..136df09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 
 import java.io.EOFException;
@@ -73,7 +74,7 @@ public class BlobUtils {
 	 */
 	static File initStorageDirectory(String storageDirectory) {
 		File baseDir;
-		if (storageDirectory == null || storageDirectory.trim().isEmpty()) {
+		if (StringUtils.isNullOrWhitespaceOnly(storageDirectory)) {
 			baseDir = new File(System.getProperty("java.io.tmpdir"));
 		}
 		else {
@@ -81,10 +82,9 @@ public class BlobUtils {
 		}
 
 		File storageDir;
-		final int MAX_ATTEMPTS = 10;
-		int attempt;
 
-		for(attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
+		final int MAX_ATTEMPTS = 10;
+		for(int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
 			storageDir = new File(baseDir, String.format(
 					"blobStore-%s", UUID.randomUUID().toString()));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index deba738..2c05002 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -20,12 +20,7 @@ package org.apache.flink.runtime.blob;
 
 import com.google.common.io.Files;
 
-import org.apache.commons.lang3.StringUtils;
-
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.IOUtils;
@@ -38,7 +33,6 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.net.URI;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -47,25 +41,24 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>This is used in addition to the local blob storage for high availability.
  */
-class FileSystemBlobStore implements BlobStore {
+public class FileSystemBlobStore implements BlobStore {
 
 	private static final Logger LOG = LoggerFactory.getLogger(FileSystemBlobStore.class);
 
+	/** The file system in which blobs are stored */
+	private final FileSystem fileSystem;
+	
 	/** The base path of the blob store */
 	private final String basePath;
 
-	FileSystemBlobStore(Configuration config) throws IOException {
-		String storagePath = config.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
-
-		if (storagePath == null || StringUtils.isBlank(storagePath)) {
-			throw new IllegalConfigurationException("Missing high-availability storage path for metadata." +
-					" Specify via configuration key '" + HighAvailabilityOptions.HA_STORAGE_PATH + "'.");
-		}
+	public FileSystemBlobStore(FileSystem fileSystem, String storagePath) throws IOException {
+		this.fileSystem = checkNotNull(fileSystem);
+		this.basePath = checkNotNull(storagePath) + "/blob";
 
-		this.basePath = storagePath + "/blob";
+		LOG.info("Creating highly available BLOB storage directory at {}", basePath);
 
-		FileSystem.get(new Path(basePath).toUri()).mkdirs(new Path(basePath));
-		LOG.info("Created blob directory {}.", basePath);
+		fileSystem.mkdirs(new Path(basePath));
+		LOG.debug("Created highly available BLOB storage directory at {}", basePath);
 	}
 
 	// - Put ------------------------------------------------------------------
@@ -81,9 +74,7 @@ class FileSystemBlobStore implements BlobStore {
 	}
 
 	private void put(File fromFile, String toBlobPath) throws Exception {
-		try (OutputStream os = FileSystem.get(new URI(toBlobPath))
-				.create(new Path(toBlobPath), true)) {
-
+		try (OutputStream os = fileSystem.create(new Path(toBlobPath), true)) {
 			LOG.debug("Copying from {} to {}.", fromFile, toBlobPath);
 			Files.copy(fromFile, os);
 		}
@@ -106,16 +97,15 @@ class FileSystemBlobStore implements BlobStore {
 		checkNotNull(toFile, "File");
 
 		if (!toFile.exists() && !toFile.createNewFile()) {
-			throw new IllegalStateException("Failed to create target file to copy to");
+			throw new IOException("Failed to create target file to copy to");
 		}
 
-		final URI fromUri = new URI(fromBlobPath);
 		final Path fromPath = new Path(fromBlobPath);
 
-		if (FileSystem.get(fromUri).exists(fromPath)) {
-			try (InputStream is = FileSystem.get(fromUri).open(fromPath)) {
-				FileOutputStream fos = new FileOutputStream(toFile);
-
+		if (fileSystem.exists(fromPath)) {
+			try (InputStream is = fileSystem.open(fromPath);
+				FileOutputStream fos = new FileOutputStream(toFile))
+			{
 				LOG.debug("Copying from {} to {}.", fromBlobPath, toFile);
 				IOUtils.copyBytes(is, fos); // closes the streams
 			}
@@ -145,17 +135,16 @@ class FileSystemBlobStore implements BlobStore {
 	private void delete(String blobPath) {
 		try {
 			LOG.debug("Deleting {}.", blobPath);
-
-			FileSystem fs = FileSystem.get(new URI(blobPath));
+			
 			Path path = new Path(blobPath);
 
-			fs.delete(path, true);
+			fileSystem.delete(path, true);
 
 			// send a call to delete the directory containing the file. This will
 			// fail (and be ignored) when some files still exist.
 			try {
-				fs.delete(path.getParent(), false);
-				fs.delete(new Path(basePath), false);
+				fileSystem.delete(path.getParent(), false);
+				fileSystem.delete(new Path(basePath), false);
 			} catch (IOException ignored) {}
 		}
 		catch (Exception e) {
@@ -168,7 +157,7 @@ class FileSystemBlobStore implements BlobStore {
 		try {
 			LOG.debug("Cleaning up {}.", basePath);
 
-			FileSystem.get(new URI(basePath)).delete(new Path(basePath), true);
+			fileSystem.delete(new Path(basePath), true);
 		}
 		catch (Exception e) {
 			LOG.error("Failed to clean up recovery directory.");

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
index 1b71add..ece2ac1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
@@ -25,7 +25,7 @@ import java.io.File;
 /**
  * A blob store doing nothing.
  */
-class VoidBlobStore implements BlobStore {
+public class VoidBlobStore implements BlobStore {
 
 	@Override
 	public void put(File localFile, BlobKey blobKey) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index a26886a..5d78ffc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -19,11 +19,14 @@
 package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.BlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
+import java.io.IOException;
+
 /**
  * This class gives access to all services needed for
  *
@@ -72,4 +75,14 @@ public interface HighAvailabilityServices {
 	 * Gets the submitted job graph store for the job manager
 	 */
 	SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;
+
+	/**
+	 * Gets the registry that holds information about whether jobs are currently running.
+	 */
+	RunningJobsRegistry getRunningJobsRegistry() throws Exception;
+
+	/**
+	 * Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
+	 */
+	BlobStore createBlobStore() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 2c6295c..d7fd2bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -19,8 +19,11 @@
 package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.nonha.NonHaRegistry;
 import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
@@ -102,4 +105,14 @@ public class NonHaServices implements HighAvailabilityServices {
 	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
 		return new StandaloneSubmittedJobGraphStore();
 	}
+
+	@Override
+	public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+		return new NonHaRegistry();
+	}
+
+	@Override
+	public BlobStore createBlobStore() {
+		return new VoidBlobStore();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
new file mode 100644
index 0000000..e7c131c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.IOException;
+
+/**
+ * This registry tracks if a certain job is running.
+ * 
+ * <p>This registry is used in highly-available setups with multiple master nodes,
+ * to determine whether a new leader should attempt to recover a certain job (because the 
+ * job is still running), or whether the job has already finished successfully (in case of a
+ * finite job) and the leader has only been granted leadership because the previous leader
+ * quit cleanly after the job was finished.
+ */
+public interface RunningJobsRegistry {
+
+	/**
+	 * Marks a job as running.
+	 * 
+	 * @param jobID The id of the job.
+	 *
+	 * @throws IOException Thrown when the communication with the highly-available storage or registry
+	 *                     failed and could not be retried.
+	 */
+	void setJobRunning(JobID jobID) throws IOException;
+
+	/**
+	 * Marks a job as running.
+	 *
+	 * @param jobID The id of the job.
+	 * 
+	 * @throws IOException Thrown when the communication with the highly-available storage or registry
+	 *                     failed and could not be retried.
+	 */
+	void setJobFinished(JobID jobID) throws IOException;
+
+	/**
+	 * Checks whether a job is running.
+	 *
+	 * @param jobID The id of the job to check.
+	 * @return True if the job is still running, false otherwise.
+	 * 
+	 * @throws IOException Thrown when the communication with the highly-available storage or registry
+	 *                     failed and could not be retried.
+	 */
+	boolean isJobRunning(JobID jobID) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index d25965d..d32068e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -19,8 +19,15 @@
 package org.apache.flink.runtime.highavailability;
 
 import org.apache.curator.framework.CuratorFramework;
+
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.FileSystemBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
@@ -28,16 +35,57 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 
+import java.io.IOException;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
 
 /**
- * An implementation of the {@link HighAvailabilityServices} with zookeeper.
+ * An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper.
+ * The services store data in ZooKeeper's nodes as illustrated by teh following tree structure:
+ * 
+ * <pre>
+ * /flink
+ *      +/cluster_id_1/resource_manager_lock
+ *      |            |
+ *      |            +/job-id-1/job_manager_lock
+ *      |            |         /checkpoints/latest
+ *      |            |                     /latest-1
+ *      |            |                     /latest-2
+ *      |            |
+ *      |            +/job-id-2/job_manager_lock
+ *      |      
+ *      +/cluster_id_2/resource_manager_lock
+ *                   |
+ *                   +/job-id-1/job_manager_lock
+ *                            |/checkpoints/latest
+ *                            |            /latest-1
+ *                            |/persisted_job_graph
+ * </pre>
+ * 
+ * <p>The root path "/flink" is configurable via the option {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}.
+ * This makes sure Flink stores its data under specific subtrees in ZooKeeper, for example to
+ * accommodate specific permission.
+ * 
+ * <p>The "cluster_id" part identifies the data stored for a specific Flink "cluster". 
+ * This "cluster" can be either a standalone or containerized Flink cluster, or it can be job
+ * on a framework like YARN or Mesos (in a "per-job-cluster" mode).
+ * 
+ * <p>In case of a "per-job-cluster" on YARN or Mesos, the cluster-id is generated and configured
+ * automatically by the client or dispatcher that submits the Job to YARN or Mesos.
+ * 
+ * <p>In the case of a standalone cluster, that cluster-id needs to be configured via
+ * {@link HighAvailabilityOptions#HA_CLUSTER_ID}. All nodes with the same cluster id will join the same
+ * cluster and participate in the execution of the same set of jobs.
  */
 public class ZookeeperHaServices implements HighAvailabilityServices {
 
-	private static final String DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX = "/resource-manager";
+	private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock";
+
+	private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";
+
+	// ------------------------------------------------------------------------
 
 	/** The ZooKeeper client to use */
 	private final CuratorFramework client;
@@ -54,24 +102,28 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 		this.configuration = checkNotNull(configuration);
 	}
 
+	// ------------------------------------------------------------------------
+	//  Services
+	// ------------------------------------------------------------------------
+
 	@Override
 	public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
-		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX);
+		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
 	}
 
 	@Override
 	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
-		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathSuffixForJob(jobID));
+		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
 	}
 
 	@Override
 	public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
-		return ZooKeeperUtils.createLeaderElectionService(client, configuration, DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX);
+		return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
 	}
 
 	@Override
 	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
-		return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathSuffixForJob(jobID));
+		return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
 	}
 
 	@Override
@@ -84,7 +136,43 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 		return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration, executor);
 	}
 
-	private static String getPathSuffixForJob(final JobID jobID) {
-		return String.format("/job-managers/%s", jobID);
+	@Override
+	public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+		throw new UnsupportedOperationException("not yet implemented");
+	}
+
+	@Override
+	public BlobStore createBlobStore() throws IOException {
+		final String storagePath = configuration.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
+		if (isNullOrWhitespaceOnly(storagePath)) {
+			throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " +
+					HighAvailabilityOptions.HA_STORAGE_PATH);
+		}
+
+		final Path path;
+		try {
+			path = new Path(storagePath);
+		} catch (Exception e) {
+			throw new IOException("Invalid path for highly available storage (" +
+					HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+		}
+
+		final FileSystem fileSystem;
+		try {
+			fileSystem = path.getFileSystem();
+		} catch (Exception e) {
+			throw new IOException("Could not create FileSystem for highly available storage (" +
+					HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+		}
+
+		return new FileSystemBlobStore(fileSystem, storagePath);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static String getPathForJobManager(final JobID jobID) {
+		return "/" + jobID + JOB_MANAGER_LEADER_PATH;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
new file mode 100644
index 0000000..85dd711
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+
+import java.util.HashSet;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A registry for running jobs, not-highly available.
+ */
+public class NonHaRegistry implements RunningJobsRegistry {
+
+	/** The currently running jobs */
+	private final HashSet<JobID> running = new HashSet<>();
+
+	@Override
+	public void setJobRunning(JobID jobID) {
+		checkNotNull(jobID);
+
+		synchronized (running) {
+			running.add(jobID);
+		}
+	}
+
+	@Override
+	public void setJobFinished(JobID jobID) {
+		checkNotNull(jobID);
+
+		synchronized (running) {
+			running.remove(jobID);
+		}
+	}
+
+	@Override
+	public boolean isJobRunning(JobID jobID) {
+		checkNotNull(jobID);
+
+		synchronized (running) {
+			return running.contains(jobID);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
index 6de4253..25a2a66 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
@@ -19,9 +19,8 @@
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
 
-public interface OnCompletionActions extends FatalErrorHandler {
+public interface OnCompletionActions {
 
 	void jobFinished(JobExecutionResult result);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index a096932..74ca6f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -21,26 +21,38 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.UUID;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The runner for the job manager. It deals with job level leader election and make underlying job manager
  * properly reacted.
  */
-public class JobManagerRunner implements LeaderContender, OnCompletionActions {
+public class JobManagerRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler {
 
-	private final Logger log = LoggerFactory.getLogger(JobManagerRunner.class);
+	private static final Logger log = LoggerFactory.getLogger(JobManagerRunner.class);
+
+	// ------------------------------------------------------------------------
 
 	/** Lock to ensure that this runner can deal with leader election event and job completion notifies simultaneously */
 	private final Object lock = new Object();
@@ -48,52 +60,140 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
 	/** The job graph needs to run */
 	private final JobGraph jobGraph;
 
-	private final OnCompletionActions toNotify;
+	/** The listener to notify once the job completes - either successfully or unsuccessfully */
+	private final OnCompletionActions toNotifyOnComplete;
+
+	/** The handler to call in case of fatal (unrecoverable) errors */ 
+	private final FatalErrorHandler errorHandler;
 
 	/** Used to check whether a job needs to be run */
-	private final SubmittedJobGraphStore submittedJobGraphStore;
+	private final RunningJobsRegistry runningJobsRegistry;
 
 	/** Leader election for this job */
 	private final LeaderElectionService leaderElectionService;
 
+	private final JobManagerServices jobManagerServices;
+
 	private final JobMaster jobManager;
 
+	private final JobManagerMetricGroup jobManagerMetricGroup;
+
 	/** flag marking the runner as shut down */
 	private volatile boolean shutdown;
 
+	// ------------------------------------------------------------------------
+
 	public JobManagerRunner(
-		final JobGraph jobGraph,
-		final Configuration configuration,
-		final RpcService rpcService,
-		final HighAvailabilityServices haServices,
-		final OnCompletionActions toNotify) throws Exception
+			final JobGraph jobGraph,
+			final Configuration configuration,
+			final RpcService rpcService,
+			final HighAvailabilityServices haServices,
+			final OnCompletionActions toNotifyOnComplete,
+			final FatalErrorHandler errorHandler) throws Exception
 	{
 		this(jobGraph, configuration, rpcService, haServices,
-			JobManagerServices.fromConfiguration(configuration), toNotify);
+				new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)),
+				toNotifyOnComplete, errorHandler);
 	}
 
 	public JobManagerRunner(
-		final JobGraph jobGraph,
-		final Configuration configuration,
-		final RpcService rpcService,
-		final HighAvailabilityServices haServices,
-		final JobManagerServices jobManagerServices,
-		final OnCompletionActions toNotify) throws Exception
+			final JobGraph jobGraph,
+			final Configuration configuration,
+			final RpcService rpcService,
+			final HighAvailabilityServices haServices,
+			final MetricRegistry metricRegistry,
+			final OnCompletionActions toNotifyOnComplete,
+			final FatalErrorHandler errorHandler) throws Exception
+	{
+		this(jobGraph, configuration, rpcService, haServices,
+				JobManagerServices.fromConfiguration(configuration, haServices),
+				metricRegistry,
+				toNotifyOnComplete, errorHandler);
+	}
+
+	/**
+	 * 
+	 * <p>Exceptions that occur while creating the JobManager or JobManagerRunner are directly
+	 * thrown and not reported to the given {@code FatalErrorHandler}.
+	 * 
+	 * <p>This JobManagerRunner assumes that it owns the given {@code JobManagerServices}.
+	 * It will shut them down on error and on calls to {@link #shutdown()}.
+	 * 
+	 * @throws Exception Thrown if the runner cannot be set up, because either one of the
+	 *                   required services could not be started, ot the Job could not be initialized.
+	 */
+	public JobManagerRunner(
+			final JobGraph jobGraph,
+			final Configuration configuration,
+			final RpcService rpcService,
+			final HighAvailabilityServices haServices,
+			final JobManagerServices jobManagerServices,
+			final MetricRegistry metricRegistry,
+			final OnCompletionActions toNotifyOnComplete,
+			final FatalErrorHandler errorHandler) throws Exception
 	{
-		this.jobGraph = jobGraph;
-		this.toNotify = toNotify;
-		this.submittedJobGraphStore = haServices.getSubmittedJobGraphStore();
-		this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
-
-		this.jobManager = new JobMaster(
-			jobGraph, configuration, rpcService, haServices,
-			jobManagerServices.libraryCacheManager,
-			jobManagerServices.restartStrategyFactory,
-			jobManagerServices.savepointStore,
-			jobManagerServices.timeout,
-			new Scheduler(jobManagerServices.executorService),
-			jobManagerServices.jobManagerMetricGroup,
-			this);
+
+		JobManagerMetricGroup jobManagerMetrics = null;
+
+		// make sure we cleanly shut down out JobManager services if initialization fails
+		try {
+			this.jobGraph = checkNotNull(jobGraph);
+			this.toNotifyOnComplete = checkNotNull(toNotifyOnComplete);
+			this.errorHandler = checkNotNull(errorHandler);
+			this.jobManagerServices = checkNotNull(jobManagerServices);
+
+			checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");
+
+			final String hostAddress = rpcService.getAddress().isEmpty() ? "localhost" : rpcService.getAddress();
+			jobManagerMetrics = new JobManagerMetricGroup(metricRegistry, hostAddress);
+			this.jobManagerMetricGroup = jobManagerMetrics;
+
+			// libraries and class loader first
+			final BlobLibraryCacheManager libraryCacheManager = jobManagerServices.libraryCacheManager;
+			try {
+				libraryCacheManager.registerJob(
+						jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
+			} catch (IOException e) {
+				throw new Exception("Cannot set up the user code libraries: " + e.getMessage(), e);
+			}
+
+			final ClassLoader userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID());
+			if (userCodeLoader == null) {
+				throw new Exception("The user code class loader could not be initialized.");
+			}
+
+			// high availability services next
+			this.runningJobsRegistry = haServices.getRunningJobsRegistry();
+			this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
+
+			// now start the JobManager
+			this.jobManager = new JobMaster(
+					jobGraph, configuration,
+					rpcService,
+					haServices,
+					jobManagerServices.executorService,
+					jobManagerServices.libraryCacheManager,
+					jobManagerServices.restartStrategyFactory,
+					jobManagerServices.rpcAskTimeout,
+					jobManagerMetrics,
+					this,
+					this,
+					userCodeLoader);
+		}
+		catch (Throwable t) {
+			// clean up everything
+			try {
+				jobManagerServices.shutdown();
+			} catch (Throwable tt) {
+				log.error("Error while shutting down JobManager services", tt);
+			}
+
+			if (jobManagerMetrics != null) {
+				jobManagerMetrics.close();
+			}
+
+			throw new JobExecutionException(jobGraph.getJobID(), "Could not set up JobManager", t);
+		}
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -101,9 +201,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
 	//----------------------------------------------------------------------------------------------
 
 	public void start() throws Exception {
-		jobManager.init();
-		jobManager.start();
-
 		try {
 			leaderElectionService.start(this);
 		}
@@ -114,11 +211,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
 	}
 
 	public void shutdown() {
-		shutdown(new Exception("The JobManager runner is shutting down"));
-	}
-
-	public void shutdown(Throwable cause) {
-		// TODO what is the cause used for ?
 		shutdownInternally();
 	}
 
@@ -129,12 +221,29 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
 			if (leaderElectionService != null) {
 				try {
 					leaderElectionService.stop();
-				} catch (Exception e) {
-					log.error("Could not properly shutdown the leader election service.");
+				} catch (Throwable t) {
+					log.error("Could not properly shutdown the leader election service", t);
 				}
 			}
 
-			jobManager.shutDown();
+			try {
+				jobManager.shutDown();
+			} catch (Throwable t) {
+				log.error("Error shutting down JobManager", t);
+			}
+
+			try {
+				jobManagerServices.shutdown();
+			} catch (Throwable t) {
+				log.error("Error shutting down JobManager services", t);
+			}
+
+			// make all registered metrics go away
+			try {
+				jobManagerMetricGroup.close();
+			} catch (Throwable t) {
+				log.error("Error while unregistering metrics", t);
+			}
 		}
 	}
 
@@ -148,11 +257,12 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
 	@Override
 	public void jobFinished(JobExecutionResult result) {
 		try {
+			unregisterJobFromHighAvailability();
 			shutdownInternally();
 		}
 		finally {
-			if (toNotify != null) {
-				toNotify.jobFinished(result);
+			if (toNotifyOnComplete != null) {
+				toNotifyOnComplete.jobFinished(result);
 			}
 		}
 	}
@@ -163,11 +273,12 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
 	@Override
 	public void jobFailed(Throwable cause) {
 		try {
+			unregisterJobFromHighAvailability();
 			shutdownInternally();
 		}
 		finally {
-			if (toNotify != null) {
-				toNotify.jobFailed(cause);
+			if (toNotifyOnComplete != null) {
+				toNotifyOnComplete.jobFailed(cause);
 			}
 		}
 	}
@@ -178,11 +289,12 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
 	@Override
 	public void jobFinishedByOther() {
 		try {
+			unregisterJobFromHighAvailability();
 			shutdownInternally();
 		}
 		finally {
-			if (toNotify != null) {
-				toNotify.jobFinishedByOther();
+			if (toNotifyOnComplete != null) {
+				toNotifyOnComplete.jobFinishedByOther();
 			}
 		}
 	}
@@ -192,18 +304,43 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
 	 */
 	@Override
 	public void onFatalError(Throwable exception) {
-		// first and in any case, notify our handler, so it can react fast
+		// we log first to make sure an explaining message goes into the log
+		// we even guard the log statement here to increase chances that the error handler
+		// gets the notification on hard critical situations like out-of-memory errors
+		try {
+			log.error("JobManager runner encountered a fatal error.", exception);
+		} catch (Throwable ignored) {}
+
+		// in any case, notify our handler, so it can react fast
 		try {
-			if (toNotify != null) {
-				toNotify.onFatalError(exception);
+			if (errorHandler != null) {
+				errorHandler.onFatalError(exception);
 			}
 		}
 		finally {
-			log.error("JobManager runner encountered a fatal error.", exception);
+			// the shutdown may not even needed any more, if the fatal error
+			// handler kills the process. that is fine, a process kill cleans up better than anything.
 			shutdownInternally();
 		}
 	}
 
+	/**
+	 * Marks this runner's job as not running. Other JobManager will not recover the job
+	 * after this call.
+	 * 
+	 * <p>This method never throws an exception.
+	 */
+	private void unregisterJobFromHighAvailability() {
+		try {
+			runningJobsRegistry.setJobFinished(jobGraph.getJobID());
+		}
+		catch (Throwable t) {
+			log.error("Could not un-register from high-availability services job {} ({})." +
+					"Other JobManager's may attempt to recover it and re-execute it.",
+					jobGraph.getName(), jobGraph.getJobID(), t);
+		}
+	}
+
 	//----------------------------------------------------------------------------------------------
 	// Leadership methods
 	//----------------------------------------------------------------------------------------------
@@ -223,15 +360,25 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
 			// it's okay that job manager wait for the operation complete
 			leaderElectionService.confirmLeaderSessionID(leaderSessionID);
 
+			boolean jobRunning;
+			try {
+				jobRunning = runningJobsRegistry.isJobRunning(jobGraph.getJobID());
+			} catch (Throwable t) {
+				log.error("Could not access status (running/finished) of job {}. " +
+						"Falling back to assumption that job is running and attempting recovery...",
+						jobGraph.getJobID(), t);
+				jobRunning = true;
+			}
+
 			// Double check the leadership after we confirm that, there is a small chance that multiple
 			// job managers schedule the same job after if they try to recover at the same time.
 			// This will eventually be noticed, but can not be ruled out from the beginning.
 			if (leaderElectionService.hasLeadership()) {
-				if (isJobFinishedByOthers()) {
+				if (jobRunning) {
+					jobManager.start(leaderSessionID);
+				} else {
 					log.info("Job {} ({}) already finished by others.", jobGraph.getName(), jobGraph.getJobID());
 					jobFinishedByOther();
-				} else {
-					jobManager.getSelf().startJob(leaderSessionID);
 				}
 			}
 		}
@@ -248,7 +395,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
 			log.info("JobManager for job {} ({}) was revoked leadership at {}.",
 				jobGraph.getName(), jobGraph.getJobID(), getAddress());
 
-			jobManager.getSelf().suspendJob(new Exception("JobManager is no longer the leader."));
+			jobManager.getSelf().suspendExecution(new Exception("JobManager is no longer the leader."));
 		}
 	}
 
@@ -263,11 +410,9 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
 		onFatalError(exception);
 	}
 
-	@VisibleForTesting
-	boolean isJobFinishedByOthers() {
-		// TODO: Fix
-		return false;
-	}
+	//----------------------------------------------------------------------------------------------
+	// Testing
+	//----------------------------------------------------------------------------------------------
 
 	@VisibleForTesting
 	boolean isShutdown() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index e6beba6..fff75d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -19,13 +19,20 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.util.ExceptionUtils;
+
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -40,34 +47,81 @@ public class JobManagerServices {
 
 	public final RestartStrategyFactory restartStrategyFactory;
 
-	public final SavepointStore savepointStore;
-
-	public final Time timeout;
-
-	public final JobManagerMetricGroup jobManagerMetricGroup;
+	public final Time rpcAskTimeout;
 
 	public JobManagerServices(
 			ExecutorService executorService,
 			BlobLibraryCacheManager libraryCacheManager,
 			RestartStrategyFactory restartStrategyFactory,
-			SavepointStore savepointStore,
-			Time timeout,
-			JobManagerMetricGroup jobManagerMetricGroup) {
+			Time rpcAskTimeout) {
 
 		this.executorService = checkNotNull(executorService);
 		this.libraryCacheManager = checkNotNull(libraryCacheManager);
 		this.restartStrategyFactory = checkNotNull(restartStrategyFactory);
-		this.savepointStore = checkNotNull(savepointStore);
-		this.timeout = checkNotNull(timeout);
-		this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup);
+		this.rpcAskTimeout = checkNotNull(rpcAskTimeout);
+	}
+
+	/**
+	 * 
+	 * <p>This method makes sure all services are closed or shut down, even when an exception occurred
+	 * in the shutdown of one component. The first encountered exception is thrown, with successive
+	 * exceptions added as suppressed exceptions.
+	 * 
+	 * @throws Exception The first Exception encountered during shutdown.
+	 */
+	public void shutdown() throws Exception {
+		Throwable firstException = null;
+
+		try {
+			executorService.shutdownNow();
+		} catch (Throwable t) {
+			firstException = t;
+		}
+
+		try {
+			libraryCacheManager.shutdown();
+		}
+		catch (Throwable t) {
+			if (firstException == null) {
+				firstException = t;
+			} else {
+				firstException.addSuppressed(t);
+			}
+		}
+
+		if (firstException != null) {
+			ExceptionUtils.rethrowException(firstException, "Error while shutting down JobManager services");
+		}
 	}
 
 	// ------------------------------------------------------------------------
 	//  Creating the components from a configuration 
 	// ------------------------------------------------------------------------
 	
-	public static JobManagerServices fromConfiguration(Configuration config) throws Exception {
-		// TODO not yet implemented
-		return null;
+
+	public static JobManagerServices fromConfiguration(
+			Configuration config,
+			HighAvailabilityServices haServices) throws Exception {
+
+		final BlobServer blobServer = new BlobServer(config, haServices);
+
+		final long cleanupInterval = config.getLong(
+			ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
+			ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+
+		final BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval);
+
+		final FiniteDuration timeout;
+		try {
+			timeout = AkkaUtils.getTimeout(config);
+		} catch (NumberFormatException e) {
+			throw new IllegalConfigurationException(AkkaUtils.formatDurationParingErrorMessage());
+		}
+
+		return new JobManagerServices(
+			new ForkJoinPool(),
+			libraryCacheManager,
+			RestartStrategyFactory.createRestartStrategyFactory(config),
+			Time.of(timeout.length(), timeout.unit()));
 	}
 }