You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:23:02 UTC

[43/52] [abbrv] flink git commit: [FLINK-5254] [yarn] Implement YARN High-Availability Services

[FLINK-5254] [yarn] Implement YARN High-Availability Services


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a7dbda7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a7dbda7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a7dbda7

Branch: refs/heads/master
Commit: 2a7dbda79a00863a511fcf64b339770d1d00f805
Parents: e2922ad
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Dec 5 01:34:32 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:27 2016 +0100

----------------------------------------------------------------------
 .../flink/configuration/Configuration.java      |  12 +-
 .../FsNegativeRunningJobsRegistryTest.java      | 121 ++++++
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java |  20 +-
 .../highavailability/EmbeddedNonHaServices.java |  15 +-
 .../FsNegativeRunningJobsRegistry.java          | 153 ++++++++
 .../HighAvailabilityServices.java               |  69 +++-
 .../runtime/highavailability/NonHaServices.java |  21 +-
 .../highavailability/ServicesThreadFactory.java |  40 ++
 .../highavailability/ZookeeperHaServices.java   |  17 +-
 .../SingleLeaderElectionService.java            | 384 +++++++++++++++++++
 .../nonha/AbstractNonHaServices.java            |  29 +-
 .../nonha/EmbeddedLeaderService.java            |   2 +-
 .../runtime/jobmanager/slots/AllocatedSlot.java |   1 -
 .../StandaloneLeaderRetrievalService.java       |   1 +
 .../flink/runtime/minicluster/MiniCluster.java  |   2 +-
 .../resourcemanager/JobLeaderIdService.java     |   2 +-
 .../resourcemanager/ResourceManagerRunner.java  |   3 +-
 .../flink/runtime/rpc/RpcServiceUtils.java      |  70 ++++
 .../FsNegativeRunningJobsRegistryTest.java      |  85 ++++
 .../TestingHighAvailabilityServices.java        |  14 +-
 .../SingleLeaderElectionServiceTest.java        | 226 +++++++++++
 flink-yarn/pom.xml                              |  21 +
 ...bstractYarnFlinkApplicationMasterRunner.java |  34 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |   2 +-
 .../yarn/YarnFlinkApplicationMasterRunner.java  |  18 +-
 .../flink/yarn/YarnTaskExecutorRunner.java      |  13 +-
 .../yarn/configuration/YarnConfigOptions.java   |  49 +++
 .../AbstractYarnNonHaServices.java              | 105 +++++
 .../YarnHighAvailabilityServices.java           | 343 +++++++++++++++++
 .../YarnIntraNonHaMasterServices.java           | 188 +++++++++
 .../YarnPreConfiguredMasterNonHaServices.java   | 172 +++++++++
 .../YarnIntraNonHaMasterServicesTest.java       | 149 +++++++
 .../YarnPreConfiguredMasterHaServicesTest.java  | 234 +++++++++++
 33 files changed, 2537 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index f15c669..8f23435 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -44,7 +44,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 		implements IOReadableWritable, java.io.Serializable, Cloneable {
 
 	private static final long serialVersionUID = 1L;
-	
+
 	private static final byte TYPE_STRING = 0;
 	private static final byte TYPE_INT = 1;
 	private static final byte TYPE_LONG = 2;
@@ -52,14 +52,14 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	private static final byte TYPE_FLOAT = 4;
 	private static final byte TYPE_DOUBLE = 5;
 	private static final byte TYPE_BYTES = 6;
-	
+
 	/** The log object used for debugging. */
 	private static final Logger LOG = LoggerFactory.getLogger(Configuration.class);
-	
+
 
 	/** Stores the concrete key/value pairs of this configuration object. */
 	protected final HashMap<String, Object> confData;
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -639,12 +639,16 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 		Object o = getRawValue(configOption.key());
 
 		if (o != null) {
+			// found a value for the current proper key
 			return o;
 		}
 		else if (configOption.hasDeprecatedKeys()) {
+			// try the deprecated keys
 			for (String deprecatedKey : configOption.deprecatedKeys()) {
 				Object oo = getRawValue(deprecatedKey);
 				if (oo != null) {
+					LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'", 
+							deprecatedKey, configOption.key());
 					return oo;
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
new file mode 100644
index 0000000..40d75e8
--- /dev/null
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hdfstests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FsNegativeRunningJobsRegistryTest {
+
+	@ClassRule
+	public static final TemporaryFolder TEMP_DIR = new TemporaryFolder();
+
+	private static MiniDFSCluster HDFS_CLUSTER;
+
+	private static Path HDFS_ROOT_PATH;
+
+	// ------------------------------------------------------------------------
+	//  startup / shutdown
+	// ------------------------------------------------------------------------
+
+	@BeforeClass
+	public static void createHDFS() throws Exception {
+		final File tempDir = TEMP_DIR.newFolder();
+
+		Configuration hdConf = new Configuration();
+		hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath());
+
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+		HDFS_CLUSTER = builder.build();
+
+		HDFS_ROOT_PATH = new Path("hdfs://" + HDFS_CLUSTER.getURI().getHost() + ":"
+				+ HDFS_CLUSTER.getNameNodePort() + "/");
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		if (HDFS_CLUSTER != null) {
+			HDFS_CLUSTER.shutdown();
+		}
+		HDFS_CLUSTER = null;
+		HDFS_ROOT_PATH = null;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Tests
+	// ------------------------------------------------------------------------
+	
+	@Test
+	public void testCreateAndSetFinished() throws Exception {
+		final Path workDir = new Path(HDFS_ROOT_PATH, "test-work-dir");
+		final JobID jid = new JobID();
+
+		FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(workDir);
+
+		// initially, without any call, the job is considered running
+		assertTrue(registry.isJobRunning(jid));
+
+		// repeated setting should not affect the status
+		registry.setJobRunning(jid);
+		assertTrue(registry.isJobRunning(jid));
+
+		// set the job to finished and validate
+		registry.setJobFinished(jid);
+		assertFalse(registry.isJobRunning(jid));
+
+		// another registry should pick this up
+		FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir);
+		assertFalse(otherRegistry.isJobRunning(jid));
+	}
+
+	@Test
+	public void testSetFinishedAndRunning() throws Exception {
+		final Path workDir = new Path(HDFS_ROOT_PATH, "�nother_w�rk_direct�r�");
+		final JobID jid = new JobID();
+
+		FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(workDir);
+
+		// set the job to finished and validate
+		registry.setJobFinished(jid);
+		assertFalse(registry.isJobRunning(jid));
+
+		// set the job to back to running and validate
+		registry.setJobRunning(jid);
+		assertTrue(registry.isJobRunning(jid));
+
+		// another registry should pick this up
+		FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir);
+		assertTrue(otherRegistry.isJobRunning(jid));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 0eab032..36dfa55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -35,6 +35,8 @@ import java.lang.reflect.Method;
 import java.net.URI;
 import java.net.UnknownHostException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Concrete implementation of the {@link FileSystem} base class for the Hadoop File System. The
  * class is a wrapper class which encapsulated the original Hadoop HDFS API.
@@ -60,7 +62,8 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 
 
 	/**
-	 * Creates a new DistributedFileSystem object to access HDFS
+	 * Creates a new DistributedFileSystem object to access HDFS, based on a class name
+	 * and picking up the configuration from the class path or the Flink configuration.
 	 * 
 	 * @throws IOException
 	 *         throw if the required HDFS classes cannot be instantiated
@@ -76,6 +79,21 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 		this.fs = instantiateFileSystem(fsClass);
 	}
 
+	/**
+	 * Creates a new DistributedFileSystem that uses the given Hadoop
+	 * {@link org.apache.hadoop.fs.FileSystem} under the hood.
+	 *
+	 * @param hadoopConfig The Hadoop configuration that the FileSystem is based on.
+	 * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
+	 */
+	public HadoopFileSystem(
+			org.apache.hadoop.conf.Configuration hadoopConfig,
+			org.apache.hadoop.fs.FileSystem hadoopFileSystem) {
+
+		this.conf = checkNotNull(hadoopConfig, "hadoopConfig");
+		this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
+	}
+
 	private Class<? extends org.apache.hadoop.fs.FileSystem> getDefaultHDFSClass() throws IOException {
 		Class<? extends org.apache.hadoop.fs.FileSystem> fsClass = null;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
index b91cec1..a417599 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -43,6 +43,12 @@ public class EmbeddedNonHaServices extends AbstractNonHaServices implements High
 	// ------------------------------------------------------------------------
 
 	@Override
+	public String getResourceManagerEndpointName() {
+		// dynamic actor name
+		return null;
+	}
+
+	@Override
 	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
 		return resourceManagerLeaderService.createLeaderRetrievalService();
 	}
@@ -55,11 +61,16 @@ public class EmbeddedNonHaServices extends AbstractNonHaServices implements High
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void shutdown() throws Exception {
+	public void close() throws Exception {
 		try {
-			super.shutdown();
+			super.close();
 		} finally {
 			resourceManagerLeaderService.shutdown();
 		}
 	}
+
+	@Override
+	public void closeAndCleanupAllData() throws Exception {
+		close();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
new file mode 100644
index 0000000..9d8b226
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This {@link RunningJobsRegistry} tracks the status jobs via marker files,
+ * marking finished jobs via marker files.
+ * 
+ * <p>The general contract is the following:
+ * <ul>
+ *     <li>Initially, a marker file does not exist (no one created it, yet), which means
+ *         the specific job is assumed to be running</li>
+ *     <li>The JobManager that finishes calls this service to create the marker file,
+ *         which marks the job as finished.</li>
+ *     <li>If a JobManager gains leadership at some point when shutdown is in progress,
+ *         it will see the marker file and realize that the job is finished.</li>
+ *     <li>The application framework is expected to clean the file once the application
+ *         is completely shut down. At that point, no JobManager will attempt to
+ *         start the job, even if it gains leadership.</li>
+ * </ul>
+ * 
+ * <p>It is especially tailored towards deployment modes like for example
+ * YARN, where HDFS is available as a persistent file system, and the YARN
+ * application's working directories on HDFS are automatically cleaned
+ * up after the application completed. 
+ */
+public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry {
+
+	private static final String PREFIX = ".job_complete_";
+
+	private final FileSystem fileSystem;
+
+	private final Path basePath;
+
+	/**
+	 * Creates a new registry that writes to the FileSystem and working directory
+	 * denoted by the given path.
+	 * 
+	 * <p>The initialization will attempt to write to the given working directory, in
+	 * order to catch setup/configuration errors early.
+	 *
+	 * @param workingDirectory The working directory for files to track the job status.
+	 *
+	 * @throws IOException Thrown, if the specified directory cannot be accessed.
+	 */
+	public FsNegativeRunningJobsRegistry(Path workingDirectory) throws IOException {
+		this(workingDirectory.getFileSystem(), workingDirectory);
+	}
+
+	/**
+	 * Creates a new registry that writes its files to the given FileSystem at
+	 * the given working directory path.
+	 * 
+	 * <p>The initialization will attempt to write to the given working directory, in
+	 * order to catch setup/configuration errors early.
+	 *
+	 * @param fileSystem The FileSystem to use for the marker files.
+	 * @param workingDirectory The working directory for files to track the job status.
+	 *
+	 * @throws IOException Thrown, if the specified directory cannot be accessed.
+	 */
+	public FsNegativeRunningJobsRegistry(FileSystem fileSystem, Path workingDirectory) throws IOException {
+		this.fileSystem = checkNotNull(fileSystem, "fileSystem");
+		this.basePath = checkNotNull(workingDirectory, "workingDirectory");
+
+		// to be safe, attempt to write to the working directory, to
+		// catch problems early
+		final Path testFile = new Path(workingDirectory, ".registry_test");
+		try (FSDataOutputStream out = fileSystem.create(testFile, false)) {
+			out.write(42);
+		}
+		catch (IOException e) {
+			throw new IOException("Unable to write to working directory: " + workingDirectory, e);
+		}
+		finally {
+			fileSystem.delete(testFile, false);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void setJobRunning(JobID jobID) throws IOException {
+		checkNotNull(jobID, "jobID");
+		final Path filePath = createMarkerFilePath(jobID);
+
+		// delete the marker file, if it exists
+		try {
+			fileSystem.delete(filePath, false);
+		}
+		catch (FileNotFoundException e) {
+			// apparently job was already considered running
+		}
+	}
+
+	@Override
+	public void setJobFinished(JobID jobID) throws IOException {
+		checkNotNull(jobID, "jobID");
+		final Path filePath = createMarkerFilePath(jobID);
+
+		// create the file
+		// to avoid an exception if the job already exists, set overwrite=true
+		try (FSDataOutputStream out = fileSystem.create(filePath, true)) {
+			out.write(42);
+		}
+	}
+
+	@Override
+	public boolean isJobRunning(JobID jobID) throws IOException {
+		checkNotNull(jobID, "jobID");
+
+		// check for the existence of the file
+		try {
+			fileSystem.getFileStatus(createMarkerFilePath(jobID));
+			// file was found --> job is terminated
+			return false;
+		}
+		catch (FileNotFoundException e) {
+			// file does not exist, job is still running
+			return true;
+		}
+	}
+
+	private Path createMarkerFilePath(JobID jobId) {
+		return new Path(basePath, PREFIX + jobId.toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 360de7b..4169204 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -26,19 +26,45 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
 import java.io.IOException;
+import java.util.UUID;
 
 /**
- * This class gives access to all services needed for
- *
+ * The HighAvailabilityServices give access to all services needed for a highly-available
+ * setup. In particular, the services provide access to highly available storage and
+ * registries, as well as distributed counters and leader election.
+ * 
  * <ul>
  *     <li>ResourceManager leader election and leader retrieval</li>
  *     <li>JobManager leader election and leader retrieval</li>
  *     <li>Persistence for checkpoint metadata</li>
  *     <li>Registering the latest completed checkpoint(s)</li>
- *     <li>Persistence for submitted job graph</li>
+ *     <li>Persistence for the BLOB store</li>
+ *     <li>Registry that marks a job's status</li>
+ *     <li>Naming of RPC endpoints</li>
  * </ul>
  */
-public interface HighAvailabilityServices {
+public interface HighAvailabilityServices extends AutoCloseable {
+
+	// ------------------------------------------------------------------------
+	//  Constants
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This UUID should be used when no proper leader election happens, but a simple
+	 * pre-configured leader is used. That is for example the case in non-highly-available
+	 * standalone setups.
+	 */
+	UUID DEFAULT_LEADER_ID = new UUID(0, 0);
+
+	// ------------------------------------------------------------------------
+	//  Endpoint Naming
+	// ------------------------------------------------------------------------
+
+	String getResourceManagerEndpointName();
+
+	// ------------------------------------------------------------------------
+	//  Services
+	// ------------------------------------------------------------------------
 
 	/**
 	 * Gets the leader retriever for the cluster's resource manager.
@@ -88,7 +114,7 @@ public interface HighAvailabilityServices {
 	 *
 	 * @return Running job registry to retrieve running jobs
 	 */
-	RunningJobsRegistry getRunningJobsRegistry();
+	RunningJobsRegistry getRunningJobsRegistry() throws Exception;
 
 	/**
 	 * Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
@@ -99,11 +125,38 @@ public interface HighAvailabilityServices {
 	BlobStore createBlobStore() throws IOException;
 
 	// ------------------------------------------------------------------------
+	//  Shutdown and Cleanup
+	// ------------------------------------------------------------------------
 
 	/**
-	 * Shut the high availability service down.
+	 * Closes the high availability services, releasing all resources.
+	 * 
+	 * <p>This method <b>does not delete or clean up</b> any data stored in external stores
+	 * (file systems, ZooKeeper, etc). Another instance of the high availability
+	 * services will be able to recover the job.
+	 * 
+	 * <p>If an exception occurs during closing services, this method will attempt to
+	 * continue closing other services and report exceptions only after all services
+	 * have been attempted to be closed.
 	 *
-	 * @throws Exception if the shut down fails
+	 * @throws Exception Thrown, if an exception occurred while closing these services.
+	 */
+	@Override
+	void close() throws Exception;
+
+	/**
+	 * Closes the high availability services (releasing all resources) and deletes
+	 * all data stored by these services in external stores.
+	 * 
+	 * <p>After this method was called, the any job or session that was managed by
+	 * these high availability services will be unrecoverable.
+	 * 
+	 * <p>If an exception occurs during cleanup, this method will attempt to
+	 * continue the cleanup and report exceptions only after all cleanup steps have
+	 * been attempted.
+	 * 
+	 * @throws Exception Thrown, if an exception occurred while closing these services
+	 *                   or cleaning up data stored by them.
 	 */
-	void shutdown() throws Exception;
+	void closeAndCleanupAllData() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 75f44ed..d644fb9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -18,14 +18,12 @@
 
 package org.apache.flink.runtime.highavailability;
 
+import org.apache.flink.runtime.highavailability.leaderelection.SingleLeaderElectionService;
 import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 
-import java.util.UUID;
-
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -39,6 +37,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class NonHaServices extends AbstractNonHaServices implements HighAvailabilityServices {
 
+	/** The constant name of the ResourceManager RPC endpoint */
+	private static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager";
+
 	/** The fix address of the ResourceManager */
 	private final String resourceManagerAddress;
 
@@ -53,16 +54,26 @@ public class NonHaServices extends AbstractNonHaServices implements HighAvailabi
 	}
 
 	// ------------------------------------------------------------------------
+	//  Names
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String getResourceManagerEndpointName() {
+		return RESOURCE_MANAGER_RPC_ENDPOINT_NAME;
+	}
+
+
+	// ------------------------------------------------------------------------
 	//  Services
 	// ------------------------------------------------------------------------
 
 	@Override
 	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
-		return new StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0));
+		return new StandaloneLeaderRetrievalService(resourceManagerAddress, DEFAULT_LEADER_ID);
 	}
 
 	@Override
 	public LeaderElectionService getResourceManagerLeaderElectionService() {
-		return new StandaloneLeaderElectionService();
+		return new SingleLeaderElectionService(getExecutorService(), DEFAULT_LEADER_ID);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java
new file mode 100644
index 0000000..24667e4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import javax.annotation.Nonnull;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ServicesThreadFactory implements ThreadFactory {
+
+	private AtomicInteger enumerator = new AtomicInteger();
+
+	@Override
+	public Thread newThread(@Nonnull Runnable r) {
+		Thread thread = new Thread(r, "Flink HA Services Thread #" + enumerator.incrementAndGet());
+
+		// HA threads should have a very high priority, but not
+		// keep the JVM running by themselves
+		thread.setPriority(Thread.MAX_PRIORITY);
+		thread.setDaemon(true);
+
+		return thread;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index 3e909e8..25d21ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -108,6 +108,16 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 	// ------------------------------------------------------------------------
 
 	@Override
+	public String getResourceManagerEndpointName() {
+		// since the resource manager name must be dynamic, we return null here
+		return null;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Services
+	// ------------------------------------------------------------------------
+
+	@Override
 	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
 		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
 	}
@@ -174,10 +184,15 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void shutdown() throws Exception {
+	public void close() throws Exception {
 		client.close();
 	}
 
+	@Override
+	public void closeAndCleanupAllData() throws Exception {
+		close();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
new file mode 100644
index 0000000..26e3cbf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.leaderelection;
+
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import java.util.HashSet;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of the {@link LeaderElectionService} interface that handles a single
+ * leader contender. When started, this service immediately grants the contender the leadership.
+ * 
+ * <p>The implementation accepts a single static leader session ID and is hence compatible with
+ * pre-configured single leader (no leader failover) setups.
+ * 
+ * <p>This implementation supports a series of leader listeners that receive notifications about
+ * the leader contender.
+ */
+public class SingleLeaderElectionService implements LeaderElectionService {
+
+	private static final Logger LOG = LoggerFactory.getLogger(SingleLeaderElectionService.class);
+
+	// ------------------------------------------------------------------------
+
+	/** lock for all operations on this instance */
+	private final Object lock = new Object();
+
+	/** The executor service that dispatches notifications */
+	private final Executor notificationExecutor;
+
+	/** The leader ID assigned to the immediate leader */
+	private final UUID leaderId;
+
+	@GuardedBy("lock")
+	private final HashSet<EmbeddedLeaderRetrievalService> listeners;
+
+	/** The currently proposed leader */
+	@GuardedBy("lock")
+	private volatile LeaderContender proposedLeader;
+
+	/** The confirmed leader */
+	@GuardedBy("lock")
+	private volatile LeaderContender leader;
+
+	/** The address of the confirmed leader */
+	@GuardedBy("lock")
+	private volatile String leaderAddress;
+
+	/** Flag marking this service as shutdown, meaning it cannot be started again */
+	@GuardedBy("lock")
+	private volatile boolean shutdown;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new leader election service. The service assigns the given leader ID
+	 * to the leader contender.
+	 * 
+	 * @param leaderId The constant leader ID assigned to the leader.
+	 */
+	public SingleLeaderElectionService(Executor notificationsDispatcher, UUID leaderId) {
+		this.notificationExecutor = checkNotNull(notificationsDispatcher);
+		this.leaderId = checkNotNull(leaderId);
+		this.listeners = new HashSet<>();
+	}
+
+	// ------------------------------------------------------------------------
+	//  leader election service
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void start(LeaderContender contender) throws Exception {
+		checkNotNull(contender, "contender");
+
+		synchronized (lock) {
+			checkState(!shutdown, "service is shut down");
+			checkState(proposedLeader == null, "service already started");
+
+			// directly grant leadership to the given contender
+			proposedLeader = contender;
+			notificationExecutor.execute(new GrantLeadershipCall(contender, leaderId));
+		}
+	}
+
+	@Override
+	public void stop() {
+		synchronized (lock) {
+			// notify all listeners that there is no leader
+			for (EmbeddedLeaderRetrievalService listener : listeners) {
+				notificationExecutor.execute(
+						new NotifyOfLeaderCall(null, null, listener.listener, LOG));
+			}
+
+			// if there was a leader, revoke its leadership
+			if (leader != null) {
+				try {
+					leader.revokeLeadership();
+				} catch (Throwable t) {
+					leader.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+				}
+			}
+
+			proposedLeader = null;
+			leader = null;
+			leaderAddress = null;
+		}
+	}
+
+	@Override
+	public void confirmLeaderSessionID(UUID leaderSessionID) {
+		checkNotNull(leaderSessionID, "leaderSessionID");
+		checkArgument(leaderSessionID.equals(leaderId), "confirmed wrong leader session id");
+
+		synchronized (lock) {
+			checkState(!shutdown, "service is shut down");
+			checkState(proposedLeader != null, "no leader proposed yet");
+			checkState(leader == null, "leader already confirmed");
+
+			// accept the confirmation
+			final String address = proposedLeader.getAddress();
+			leaderAddress = address;
+			leader = proposedLeader;
+
+			// notify all listeners
+			for (EmbeddedLeaderRetrievalService listener : listeners) {
+				notificationExecutor.execute(
+						new NotifyOfLeaderCall(address, leaderId, listener.listener, LOG));
+			}
+		}
+	}
+
+	@Override
+	public boolean hasLeadership() {
+		synchronized (lock) {
+			return leader != null;
+		}
+	}
+
+	void errorOnGrantLeadership(LeaderContender contender, Throwable error) {
+		LOG.warn("Error notifying leader listener about new leader", error);
+		contender.handleError(error instanceof Exception ? (Exception) error : new Exception(error));
+		
+		synchronized (lock) {
+			if (proposedLeader == contender) {
+				proposedLeader = null;
+				leader = null;
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  shutdown
+	// ------------------------------------------------------------------------
+
+	public boolean isShutdown() {
+		return shutdown;
+	}
+
+	public void shutdown() {
+		shutdownInternally(new Exception("The leader service is shutting down"));
+	}
+
+	private void shutdownInternally(Exception exceptionForHandlers) {
+		synchronized (lock) {
+			if (shutdown) {
+				return;
+			}
+
+			shutdown = true;
+
+			// fail the leader (if there is one)
+			if (leader != null) {
+				try {
+					leader.handleError(exceptionForHandlers);
+				} catch (Throwable ignored) {}
+			}
+
+			// clear all leader status
+			leader = null;
+			proposedLeader = null;
+			leaderAddress = null;
+
+			// fail all registered listeners
+			for (EmbeddedLeaderRetrievalService service : listeners) {
+				service.shutdown(exceptionForHandlers);
+			}
+			listeners.clear();
+		}
+	}
+
+	private void fatalError(Throwable error) {
+		LOG.error("Embedded leader election service encountered a fatal error. Shutting down service.", error);
+
+		shutdownInternally(new Exception("Leader election service is shutting down after a fatal error", error));
+	}
+
+	// ------------------------------------------------------------------------
+	//  leader listeners
+	// ------------------------------------------------------------------------
+
+	public LeaderRetrievalService createLeaderRetrievalService() {
+		checkState(!shutdown, "leader election service is shut down");
+		return new EmbeddedLeaderRetrievalService();
+	}
+
+	void addListener(EmbeddedLeaderRetrievalService service, LeaderRetrievalListener listener) {
+		synchronized (lock) {
+			checkState(!shutdown, "leader election service is shut down");
+			checkState(!service.running, "leader retrieval service is already started");
+
+			try {
+				if (!listeners.add(service)) {
+					throw new IllegalStateException("leader retrieval service was added to this service multiple times");
+				}
+
+				service.listener = listener;
+				service.running = true;
+
+				// if we already have a leader, immediately notify this new listener
+				if (leader != null) {
+					notificationExecutor.execute(
+							new NotifyOfLeaderCall(leaderAddress, leaderId, listener, LOG));
+				}
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	void removeListener(EmbeddedLeaderRetrievalService service) {
+		synchronized (lock) {
+			// if the service was not even started, simply do nothing
+			if (!service.running || shutdown) {
+				return;
+			}
+
+			try {
+				if (!listeners.remove(service)) {
+					throw new IllegalStateException("leader retrieval service does not belong to this service");
+				}
+
+				// stop the service
+				service.listener = null;
+				service.running = false;
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private class EmbeddedLeaderRetrievalService implements LeaderRetrievalService {
+
+		volatile LeaderRetrievalListener listener;
+
+		volatile boolean running;
+
+		@Override
+		public void start(LeaderRetrievalListener listener) throws Exception {
+			checkNotNull(listener);
+			addListener(this, listener);
+		}
+
+		@Override
+		public void stop() throws Exception {
+			removeListener(this);
+		}
+
+		void shutdown(Exception cause) {
+			if (running) {
+				final LeaderRetrievalListener lst = listener;
+				running = false;
+				listener = null;
+
+				try {
+					lst.handleError(cause);
+				} catch (Throwable ignored) {}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  asynchronous notifications
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This runnable informs a leader contender that it gained leadership.
+	 */
+	private class GrantLeadershipCall implements Runnable {
+
+		private final LeaderContender contender;
+		private final UUID leaderSessionId;
+
+		GrantLeadershipCall(LeaderContender contender, UUID leaderSessionId) {
+
+			this.contender = checkNotNull(contender);
+			this.leaderSessionId = checkNotNull(leaderSessionId);
+		}
+
+		@Override
+		public void run() {
+			try {
+				contender.grantLeadership(leaderSessionId);
+			}
+			catch (Throwable t) {
+				errorOnGrantLeadership(contender, t);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This runnable informs a leader listener of a new leader
+	 */
+	private static class NotifyOfLeaderCall implements Runnable {
+
+		@Nullable
+		private final String address;       // null if leader revoked without new leader
+		@Nullable
+		private final UUID leaderSessionId; // null if leader revoked without new leader
+
+		private final LeaderRetrievalListener listener;
+		private final Logger logger;
+
+		NotifyOfLeaderCall(
+				@Nullable String address,
+				@Nullable UUID leaderSessionId,
+				LeaderRetrievalListener listener,
+				Logger logger) {
+
+			this.address = address;
+			this.leaderSessionId = leaderSessionId;
+			this.listener = checkNotNull(listener);
+			this.logger = checkNotNull(logger);
+		}
+
+		@Override
+		public void run() {
+			try {
+				listener.notifyLeaderAddress(address, leaderSessionId);
+			}
+			catch (Throwable t) {
+				logger.warn("Error notifying leader listener about new leader", t);
+				listener.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+			}
+		}
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
index 474faa8..b10e414 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -25,19 +25,17 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.highavailability.ServicesThreadFactory;
 import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
-import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -132,7 +130,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void shutdown() throws Exception {
+	public void close() throws Exception {
 		synchronized (lock) {
 			if (!shutdown) {
 				shutdown = true;
@@ -149,6 +147,12 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 		}
 	}
 
+	@Override
+	public void closeAndCleanupAllData() throws Exception {
+		// this stores no data, so this method is the same as 'close()'
+		close();
+	}
+
 	private void checkNotShutdown() {
 		checkState(!shutdown, "high availability services are shut down");
 	}
@@ -160,21 +164,4 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 	protected ExecutorService getExecutorService() {
 		return executor;
 	}
-
-	private static final class ServicesThreadFactory implements ThreadFactory {
-
-		private AtomicInteger enumerator = new AtomicInteger();
-
-		@Override
-		public Thread newThread(@Nonnull Runnable r) {
-			Thread thread = new Thread(r, "Flink HA Services Thread #" + enumerator.incrementAndGet());
-
-			// HA threads should have a very high priority, but not
-			// keep the JVM running by themselves
-			thread.setPriority(Thread.MAX_PRIORITY);
-			thread.setDaemon(true);
-
-			return thread;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
index 9fad9be..d4eba26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
@@ -461,7 +461,7 @@ public class EmbeddedLeaderService {
 				contender.grantLeadership(leaderSessionId);
 			}
 			catch (Throwable t) {
-				logger.warn("Error notifying leader listener about new leader", t);
+				logger.warn("Error granting leadership to contender", t);
 				contender.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
index a0c608d..269a8f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
index 16b163c..4ad4646 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
@@ -51,6 +51,7 @@ public class StandaloneLeaderRetrievalService implements LeaderRetrievalService
 	 *
 	 * @param leaderAddress The leader's pre-configured address
 	 */
+	@Deprecated
 	public StandaloneLeaderRetrievalService(String leaderAddress) {
 		this.leaderAddress = checkNotNull(leaderAddress);
 		this.leaderId = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 29a6e59..1933554 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -345,7 +345,7 @@ public class MiniCluster {
 		// shut down high-availability services
 		if (haServices != null) {
 			try {
-				haServices.shutdown();
+				haServices.closeAndCleanupAllData();
 			} catch (Exception e) {
 				exception = firstOrSuppressed(e, exception);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index 56e72c0..6c7e249 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -59,7 +59,7 @@ public class JobLeaderIdService {
 	/** Actions to call when the job leader changes */
 	private JobLeaderIdActions jobLeaderIdActions;
 
-	public JobLeaderIdService(HighAvailabilityServices highAvailabilityServices) {
+	public JobLeaderIdService(HighAvailabilityServices highAvailabilityServices) throws Exception {
 		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
 
 		this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index 959b727..e0dee0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -46,7 +45,7 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 			final Configuration configuration,
 			final RpcService rpcService,
 			final HighAvailabilityServices highAvailabilityServices,
-			final MetricRegistry metricRegistry) throws ConfigurationException {
+			final MetricRegistry metricRegistry) throws Exception {
 
 		Preconditions.checkNotNull(configuration);
 		Preconditions.checkNotNull(rpcService);

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
index 1ac54ac..018c3ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
@@ -21,19 +21,36 @@ package org.apache.flink.runtime.rpc;
 import akka.actor.ActorSystem;
 import com.typesafe.config.Config;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.util.NetUtils;
+
 import org.jboss.netty.channel.ChannelException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.UnknownHostException;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * These RPC utilities contain helper methods around RPC use, such as starting an RPC service,
+ * or constructing RPC addresses.
+ */
 public class RpcServiceUtils {
+
 	private static final Logger LOG = LoggerFactory.getLogger(RpcServiceUtils.class);
 
+	// ------------------------------------------------------------------------
+	//  RPC instantiation
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Utility method to create RPC service from configuration and hostname, port.
 	 *
@@ -78,4 +95,57 @@ public class RpcServiceUtils {
 		final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
 		return new AkkaRpcService(actorSystem, timeout);
 	}
+
+	// ------------------------------------------------------------------------
+	//  RPC endpoint addressing
+	// ------------------------------------------------------------------------
+
+	/**
+	 *
+	 * @param hostname     The hostname or address where the target RPC service is listening.
+	 * @param port         The port where the target RPC service is listening.
+	 * @param endpointName The name of the RPC endpoint.
+	 * @param config       Teh configuration from which to deduce further settings.
+	 *
+	 * @return The RPC URL of the specified RPC endpoint.
+	 */
+	public static String getRpcUrl(String hostname, int port, String endpointName, Configuration config)
+			throws UnknownHostException {
+
+		checkNotNull(config, "config is null");
+
+		final boolean sslEnabled = config.getBoolean(
+					ConfigConstants.AKKA_SSL_ENABLED,
+					ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) &&
+				SSLUtils.getSSLEnabled(config);
+
+		return getRpcUrl(hostname, port, endpointName, sslEnabled);
+	}
+
+	/**
+	 * 
+	 * @param hostname     The hostname or address where the target RPC service is listening.
+	 * @param port         The port where the target RPC service is listening.
+	 * @param endpointName The name of the RPC endpoint.
+	 * @param secure       True, if security/encryption is enabled, false otherwise.
+	 * 
+	 * @return The RPC URL of the specified RPC endpoint.
+	 */
+	public static String getRpcUrl(String hostname, int port, String endpointName, boolean secure)
+			throws UnknownHostException {
+
+		checkNotNull(hostname, "hostname is null");
+		checkNotNull(endpointName, "endpointName is null");
+		checkArgument(port > 0 && port <= 65535, "port must be in [1, 65535]");
+
+		final String protocol = secure ? "akka.ssl.tcp" : "akka.tcp";
+		final String hostPort = NetUtils.hostAndPortToUrlString(hostname, port);
+
+		return String.format("%s://flink@%s/user/%s", protocol, hostPort, endpointName);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** This class is not meant to be instantiated */
+	private RpcServiceUtils() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
new file mode 100644
index 0000000..f1ece0e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FsNegativeRunningJobsRegistryTest extends TestLogger {
+
+	@Rule
+	public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void testCreateAndSetFinished() throws Exception {
+		final File folder = tempFolder.newFolder();
+		final String uri = folder.toURI().toString();
+
+		final JobID jid = new JobID();
+
+		FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(new Path(uri));
+
+		// initially, without any call, the job is considered running
+		assertTrue(registry.isJobRunning(jid));
+
+		// repeated setting should not affect the status
+		registry.setJobRunning(jid);
+		assertTrue(registry.isJobRunning(jid));
+
+		// set the job to finished and validate
+		registry.setJobFinished(jid);
+		assertFalse(registry.isJobRunning(jid));
+
+		// another registry should pick this up
+		FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri));
+		assertFalse(otherRegistry.isJobRunning(jid));
+	}
+
+	@Test
+	public void testSetFinishedAndRunning() throws Exception {
+		final File folder = tempFolder.newFolder();
+		final String uri = folder.toURI().toString();
+
+		final JobID jid = new JobID();
+
+		FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(new Path(uri));
+
+		// set the job to finished and validate
+		registry.setJobFinished(jid);
+		assertFalse(registry.isJobRunning(jid));
+
+		// set the job to back to running and validate
+		registry.setJobRunning(jid);
+		assertTrue(registry.isJobRunning(jid));
+
+		// another registry should pick this up
+		FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri));
+		assertTrue(otherRegistry.isJobRunning(jid));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index e0f71ee..3f9865c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -155,12 +155,22 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 		return new VoidBlobStore();
 	}
 
+	@Override
+	public String getResourceManagerEndpointName() {
+		throw new UnsupportedOperationException();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Shutdown
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void shutdown() throws Exception {
-		// nothing to do, since this should not shut down individual services, but cross service parts
+	public void close() throws Exception {
+		// nothing to do
+	}
+
+	@Override
+	public void closeAndCleanupAllData() throws Exception {
+		// nothing to do
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java
new file mode 100644
index 0000000..a9805a1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.leaderelection;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the {@link SingleLeaderElectionService}.
+ */
+public class SingleLeaderElectionServiceTest {
+
+	private static final Random RND = new Random();
+
+	private final Executor executor = Executors.directExecutor();
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testStartStopAssignLeadership() throws Exception {
+		final UUID uuid = UUID.randomUUID();
+		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+
+		final LeaderContender contender = mockContender(service);
+		final LeaderContender otherContender = mockContender(service);
+
+		service.start(contender);
+		verify(contender, times(1)).grantLeadership(uuid);
+
+		service.stop();
+		verify(contender, times(1)).revokeLeadership();
+
+		// start with a new contender - the old contender must not gain another leadership
+		service.start(otherContender);
+		verify(otherContender, times(1)).grantLeadership(uuid);
+
+		verify(contender, times(1)).grantLeadership(uuid);
+		verify(contender, times(1)).revokeLeadership();
+	}
+
+	@Test
+	public void testStopBeforeConfirmingLeadership() throws Exception {
+		final UUID uuid = UUID.randomUUID();
+		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+
+		final LeaderContender contender = mock(LeaderContender.class);
+
+		service.start(contender);
+		verify(contender, times(1)).grantLeadership(uuid);
+
+		service.stop();
+
+		// because the leadership was never confirmed, there is no "revoke" call
+		verifyNoMoreInteractions(contender);
+	}
+
+	@Test
+	public void testStartOnlyOnce() throws Exception {
+		final UUID uuid = UUID.randomUUID();
+		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+
+		final LeaderContender contender = mock(LeaderContender.class);
+		final LeaderContender otherContender = mock(LeaderContender.class);
+
+		service.start(contender);
+		verify(contender, times(1)).grantLeadership(uuid);
+
+		// should not be possible to start again this with another contender
+		try {
+			service.start(otherContender);
+			fail("should fail with an exception");
+		} catch (IllegalStateException e) {
+			// expected
+		}
+
+		// should not be possible to start this again with the same contender
+		try {
+			service.start(contender);
+			fail("should fail with an exception");
+		} catch (IllegalStateException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testShutdown() throws Exception {
+		final UUID uuid = UUID.randomUUID();
+		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+
+		// create a leader contender and let it grab leadership
+		final LeaderContender contender = mockContender(service);
+		service.start(contender);
+		verify(contender, times(1)).grantLeadership(uuid);
+
+		// some leader listeners
+		final LeaderRetrievalListener listener1 = mock(LeaderRetrievalListener.class);
+		final LeaderRetrievalListener listener2 = mock(LeaderRetrievalListener.class);
+
+		LeaderRetrievalService listenerService1 = service.createLeaderRetrievalService();
+		LeaderRetrievalService listenerService2 = service.createLeaderRetrievalService();
+
+		listenerService1.start(listener1);
+		listenerService2.start(listener2);
+
+		// one listener stops
+		listenerService1.stop();
+
+		// shut down the service
+		service.shutdown();
+
+		// the leader contender and running listener should get error notifications
+		verify(contender, times(1)).handleError(any(Exception.class));
+		verify(listener2, times(1)).handleError(any(Exception.class));
+
+		// the stopped listener gets no notification
+		verify(listener1, times(0)).handleError(any(Exception.class));
+
+		// should not be possible to start again after shutdown
+		try {
+			service.start(contender);
+			fail("should fail with an exception");
+		} catch (IllegalStateException e) {
+			// expected
+		}
+
+		// no additional leadership grant
+		verify(contender, times(1)).grantLeadership(any(UUID.class));
+	}
+
+	@Test
+	public void testImmediateShutdown() throws Exception {
+		final UUID uuid = UUID.randomUUID();
+		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+		service.shutdown();
+
+		final LeaderContender contender = mock(LeaderContender.class);
+		
+		// should not be possible to start
+		try {
+			service.start(contender);
+			fail("should fail with an exception");
+		} catch (IllegalStateException e) {
+			// expected
+		}
+
+		// no additional leadership grant
+		verify(contender, times(0)).grantLeadership(any(UUID.class));
+	}
+
+//	@Test
+//	public void testNotifyListenersWhenLeaderElected() throws Exception {
+//		final UUID uuid = UUID.randomUUID();
+//		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+//
+//		final LeaderRetrievalListener listener1 = mock(LeaderRetrievalListener.class);
+//		final LeaderRetrievalListener listener2 = mock(LeaderRetrievalListener.class);
+//
+//		LeaderRetrievalService listenerService1 = service.createLeaderRetrievalService();
+//		LeaderRetrievalService listenerService2 = service.createLeaderRetrievalService();
+//
+//		listenerService1.start(listener1);
+//		listenerService1.start(listener2);
+//
+//		final LeaderContender contender = mockContender(service);
+//		service.start(contender);
+//
+//		veri
+//	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private static LeaderContender mockContender(final LeaderElectionService service) {
+		String address = StringUtils.getRandomString(RND, 5, 10, 'a', 'z');
+		return mockContender(service, address);
+	}
+
+	private static LeaderContender mockContender(final LeaderElectionService service, final String address) {
+		LeaderContender mockContender = mock(LeaderContender.class);
+
+		when(mockContender.getAddress()).thenReturn(address);
+
+		doAnswer(new Answer<Void>() {
+				@Override
+				public Void answer(InvocationOnMock invocation) throws Throwable {
+					final UUID uuid = (UUID) invocation.getArguments()[0];
+					service.confirmLeaderSessionID(uuid);
+					return null;
+				}
+		}).when(mockContender).grantLeadership(any(UUID.class));
+
+		return mockContender;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
index 18ffe13..f69e0e4 100644
--- a/flink-yarn/pom.xml
+++ b/flink-yarn/pom.xml
@@ -32,6 +32,9 @@ under the License.
 	<packaging>jar</packaging>
 
 	<dependencies>
+
+		<!-- core dependencies -->
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-runtime_2.10</artifactId>
@@ -91,6 +94,8 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<!-- test dependencies -->
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-runtime_2.10</artifactId>
@@ -98,6 +103,22 @@ under the License.
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>
+			<scope>test</scope>
+			<type>test-jar</type>
+			<version>${hadoop.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-common</artifactId>
+			<scope>test</scope>
+			<type>test-jar</type>
+			<version>${hadoop.version}</version>
+		</dependency>
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
index 923694e..1c8bad7 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
@@ -23,17 +23,19 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
-import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 /**
  * This class is the executable entry point for the YARN application master.
@@ -95,7 +97,18 @@ public abstract class AbstractYarnFlinkApplicationMasterRunner {
 			LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
 					currentUser.getShortUserName(), yarnClientUsername );
 
-			SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+			// Flink configuration
+			final Map<String, String> dynamicProperties =
+					FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
+			LOG.debug("YARN dynamic properties: {}", dynamicProperties);
+
+			final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties);
+			if(keytabPath != null && remoteKeytabPrincipal != null) {
+				flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+				flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+			}
+
+			SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(flinkConfig);
 
 			//To support Yarn Secure Integration Test Scenario
 			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
@@ -108,18 +121,7 @@ public abstract class AbstractYarnFlinkApplicationMasterRunner {
 				sc.setHadoopConfiguration(conf);
 			}
 
-			// Flink configuration
-			final Map<String, String> dynamicProperties =
-					FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
-			LOG.debug("YARN dynamic properties: {}", dynamicProperties);
-
-			final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties);
-			if(keytabPath != null && remoteKeytabPrincipal != null) {
-				flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
-				flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
-			}
-
-			SecurityContext.install(sc.setFlinkConfiguration(flinkConfig));
+			SecurityUtils.install(sc);
 
 			// Note that we use the "appMasterHostname" given by YARN here, to make sure
 			// we use the hostnames given by YARN consistently throughout akka.
@@ -129,9 +131,9 @@ public abstract class AbstractYarnFlinkApplicationMasterRunner {
 					"ApplicationMaster hostname variable %s not set", Environment.NM_HOST.key());
 			LOG.info("YARN assigned hostname for application master: {}", appMasterHostname);
 
-			return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+			return SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
 				@Override
-				public Integer run() {
+				public Integer call() throws Exception {
 					return runApplicationMaster(flinkConfig);
 				}
 			});

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 1826d43..d1ef553 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -115,7 +115,7 @@ public class YarnApplicationMasterRunner {
 	 * @param args The command line arguments.
 	 */
 	public static void main(String[] args) {
-		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / JobManager", args);
+		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / ResourceManager / JobManager", args);
 		SignalHandler.register(LOG);
 		JvmShutdownSafeguard.installAsShutdownHook(LOG);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index e58c77e..188d9ef 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
-import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -57,7 +56,12 @@ import java.io.FileInputStream;
 import java.io.ObjectInputStream;
 
 /**
- * This class is the executable entry point for the YARN application master.
+ * This class is the executable entry point for the YARN Application Master that
+ * executes a single Flink job and then shuts the YARN application down.
+ * 
+ * <p>The lifetime of the YARN application bound to that of the Flink job. Other
+ * YARN Application Master implementations are for example the YARN session.
+ * 
  * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobManagerRunner}
  * and {@link org.apache.flink.yarn.YarnResourceManager}.
  *
@@ -74,6 +78,8 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 	/** The job graph file path */
 	private static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
 
+	// ------------------------------------------------------------------------
+
 	/** The lock to guard startup / shutdown / manipulation methods */
 	private final Object lock = new Object();
 
@@ -105,7 +111,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 	 * @param args The command line arguments.
 	 */
 	public static void main(String[] args) {
-		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster runner", args);
+		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / ResourceManager / JobManager", args);
 		SignalHandler.register(LOG);
 		JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
@@ -127,7 +133,9 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 					ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
 
 			synchronized (lock) {
+				LOG.info("Starting High Availability Services");
 				haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config);
+				
 				metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 				commonRpcService = createRpcService(config, appMasterHostname, amPortRange);
 
@@ -176,7 +184,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 		return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit()));
 	}
 
-	private ResourceManager createResourceManager(Configuration config) throws ConfigurationException {
+	private ResourceManager<?> createResourceManager(Configuration config) throws Exception {
 		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(config);
 		final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
 		final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(haServices);
@@ -242,7 +250,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 			}
 			if (haServices != null) {
 				try {
-					haServices.shutdown();
+					haServices.close();
 				} catch (Throwable tt) {
 					LOG.warn("Failed to stop the HA service", tt);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index d9912eb..dc8c604 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 /**
  * This class is the executable entry point for running a TaskExecutor in a YARN container.
@@ -138,7 +139,7 @@ public class YarnTaskExecutorRunner {
 			LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
 					currentUser.getShortUserName(), yarnClientUsername);
 
-			SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+			SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
 
 			//To support Yarn Secure Integration Test Scenario
 			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
@@ -156,11 +157,11 @@ public class YarnTaskExecutorRunner {
 				configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
 			}
 
-			SecurityContext.install(sc.setFlinkConfiguration(configuration));
+			SecurityUtils.install(sc);
 
-			return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() {
+			return SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
 				@Override
-				public Integer run() {
+				public Integer call() throws Exception {
 					return runTaskExecutor(configuration);
 				}
 			});
@@ -240,7 +241,7 @@ public class YarnTaskExecutorRunner {
 			}
 			if (haServices != null) {
 				try {
-					haServices.shutdown();
+					haServices.close();
 				} catch (Throwable tt) {
 					LOG.warn("Failed to stop the HA service", tt);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
new file mode 100644
index 0000000..c3902d3
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds configuration constants used by Flink's YARN runners.
+ * These options are not expected to be ever configured by users explicitly. 
+ */
+public class YarnConfigOptions {
+
+	/**
+	 * The hostname or address where the application master RPC system is listening.
+	 */
+	public static final ConfigOption<String> APP_MASTER_RPC_ADDRESS =
+			key("yarn.appmaster.rpc.address")
+			.noDefaultValue();
+
+	/**
+	 * The port where the application master RPC system is listening.
+	 */
+	public static final ConfigOption<Integer> APP_MASTER_RPC_PORT =
+			key("yarn.appmaster.rpc.address")
+			.defaultValue(-1);
+
+	// ------------------------------------------------------------------------
+
+	/** This class is not meant to be instantiated */
+	private YarnConfigOptions() {}
+}