You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/13 20:29:43 UTC

[1/2] flink git commit: [FLINK-5254] [yarn] Implement YARN High-Availability Services

Repository: flink
Updated Branches:
  refs/heads/flip-6 8c448e8f5 -> 1a2d4b368


http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
new file mode 100644
index 0000000..4c78726
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.FileSystemBlobStore;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The basis of {@link HighAvailabilityServices} for YARN setups.
+ * These high-availability services auto-configure YARN's HDFS and the YARN application's
+ * working directory to be used to store job recovery data.
+ * 
+ * <p>Note for implementers: This class locks access to and creation of services,
+ * to make sure all services are properly shut down when shutting down this class.
+ * To participate in the checks, overriding methods should frame method body with
+ * calls to {@code enter()} and {@code exit()} as shown in the following pattern:
+ * 
+ * <pre>{@code
+ * public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+ *     enter();
+ *     try {
+ *         CuratorClient client = getCuratorClient();
+ *         return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
+ *     } finally {
+ *         exit();
+ *     }
+ * }
+ * }</pre>
+ */
+public abstract class YarnHighAvailabilityServices implements HighAvailabilityServices {
+
+	/** The name of the sub directory in which Flink stores the recovery data */
+	public static final String FLINK_RECOVERY_DATA_DIR = "flink_recovery_data";
+
+	/** Logger for these services, shared with subclasses */
+	protected static final Logger LOG = LoggerFactory.getLogger(YarnHighAvailabilityServices.class);
+
+	// ------------------------------------------------------------------------
+
+	/** The lock that guards all accesses to methods in this class */
+	private final ReentrantLock lock;
+
+	/** The Flink FileSystem object that represent the HDFS used by YARN */
+	protected final FileSystem flinkFileSystem;
+
+	/** The Hadoop FileSystem object that represent the HDFS used by YARN */
+	protected final org.apache.hadoop.fs.FileSystem hadoopFileSystem;
+
+	/** The working directory of this YARN application.
+	 * This MUST NOT be deleted when the HA services clean up */
+	protected final Path workingDirectory;
+
+	/** The directory for HA persistent data. This should be deleted when the
+	 * HA services clean up */
+	protected final Path haDataDirectory;
+
+	/** Flag marking this instance as shut down */
+	private volatile boolean closed;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates new YARN high-availability services, configuring the file system and recovery
+	 * data directory based on the working directory in the given Hadoop configuration.
+	 * 
+	 * <p>This class requires that the default Hadoop file system configured in the given
+	 * Hadoop configuration is an HDFS.
+	 * 
+	 * @param config     The Flink configuration of this component / process.
+	 * @param hadoopConf The Hadoop configuration for the YARN cluster.
+	 * 
+	 * @throws IOException Thrown, if the initialization of the Hadoop file system used by YARN fails.
+	 */
+	protected YarnHighAvailabilityServices(
+			Configuration config,
+			org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
+
+		checkNotNull(config);
+		checkNotNull(hadoopConf);
+
+		this.lock = new ReentrantLock();
+
+		// get and verify the YARN HDFS URI
+		final URI fsUri = org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConf);
+		if (fsUri.getScheme() == null || !"hdfs".equals(fsUri.getScheme().toLowerCase())) {
+			throw new IOException("Invalid file system found for YarnHighAvailabilityServices: " +
+					"Expected 'hdfs', but found '" + fsUri.getScheme() + "'.");
+		}
+
+		// initialize the Hadoop File System
+		// we go through this special code path here to make sure we get no shared cached
+		// instance of the FileSystem
+		try {
+			final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass =
+					org.apache.hadoop.fs.FileSystem.getFileSystemClass(fsUri.getScheme(), hadoopConf);
+
+			this.hadoopFileSystem = InstantiationUtil.instantiate(fsClass);
+			this.hadoopFileSystem.initialize(fsUri, hadoopConf);
+		}
+		catch (Exception e) {
+			throw new IOException("Cannot instantiate YARN's Hadoop file system for " + fsUri, e);
+		}
+
+		this.flinkFileSystem = new HadoopFileSystem(hadoopConf, hadoopFileSystem);
+
+		this.workingDirectory = new Path(hadoopFileSystem.getWorkingDirectory().toUri());
+		this.haDataDirectory = new Path(workingDirectory, FLINK_RECOVERY_DATA_DIR);
+
+		// test the file system, to make sure we fail fast if access does not work
+		try {
+			flinkFileSystem.mkdirs(haDataDirectory);
+		}
+		catch (Exception e) {
+			throw new IOException("Could not create the directory for recovery data in YARN's file system at '"
+					+ haDataDirectory + "'.", e);
+		}
+
+		LOG.info("Flink YARN application will store recovery data at {}", haDataDirectory);
+	}
+
+	// ------------------------------------------------------------------------
+	//  high availability services
+	// ------------------------------------------------------------------------
+
+	@Override
+	public BlobStore createBlobStore() throws IOException {
+		enter();
+		try {
+			return new FileSystemBlobStore(flinkFileSystem, haDataDirectory.toString());
+		} finally {
+			exit();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Shutdown
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Checks whether these services have been shut down.
+	 *
+	 * @return True, if this instance has been shut down, false if it still operational.
+	 */
+	public boolean isClosed() {
+		return closed;
+	}
+
+	@Override
+	public void close() throws Exception {
+		lock.lock();
+		try {
+			// close only once
+			if (closed) {
+				return;
+			}
+			closed = true;
+
+			// we do not propagate exceptions here, but only log them
+			try {
+				hadoopFileSystem.close();
+			} catch (Throwable t) {
+				LOG.warn("Error closing Hadoop FileSystem", t);
+			}
+		}
+		finally {
+			lock.unlock();
+		}
+	}
+
+	@Override
+	public void closeAndCleanupAllData() throws Exception {
+		lock.lock();
+		try {
+			checkState(!closed, "YarnHighAvailabilityServices are already closed");
+
+			// we remember exceptions only, then continue cleanup, and re-throw at the end
+			Throwable exception = null;
+
+			// first, we delete all data in Flink's data directory
+			try {
+				flinkFileSystem.delete(haDataDirectory, true);
+			}
+			catch (Throwable t) {
+				exception = t;
+			}
+
+			// now we actually close the services
+			try {
+				close();
+			}
+			catch (Throwable t) {
+				exception = firstOrSuppressed(t, exception);
+			}
+
+			// if some exception occurred, rethrow
+			if (exception != null) {
+				ExceptionUtils.rethrowException(exception, exception.getMessage());
+			}
+		}
+		finally {
+			lock.unlock();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * To be called at the beginning of every method that creates an HA service. Acquires the lock
+	 * and check whether this HighAvailabilityServices instance is shut down.
+	 */
+	void enter() {
+		if (!enterUnlessClosed()) {
+			throw new IllegalStateException("closed");
+		}
+	}
+
+	/**
+	 * Acquires the lock and checks whether the services are already closed. If they are
+	 * already closed, the method releases the lock and returns {@code false}.
+	 * 
+	 * @return True, if the lock was acquired and the services are not closed, false if the services are closed.
+	 */
+	boolean enterUnlessClosed() {
+		lock.lock();
+		if (!closed) {
+			return true;
+		} else {
+			lock.unlock();
+			return false;
+		}
+	}
+
+	/**
+	 * To be called at the end of every method that creates an HA service. Releases the lock.
+	 */
+	void exit() {
+		lock.unlock();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Factory from Configuration
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates the high-availability services for a single-job Flink YARN application, to be
+	 * used in the Application Master that runs both ResourceManager and JobManager.
+	 * 
+	 * @param flinkConfig  The Flink configuration.
+	 * @param hadoopConfig The Hadoop configuration for the YARN cluster.
+	 * 
+	 * @return The created high-availability services.
+	 * 
+	 * @throws IOException Thrown, if the high-availability services could not be initialized.
+	 */
+	public static YarnHighAvailabilityServices forSingleJobAppMaster(
+			Configuration flinkConfig,
+			org.apache.hadoop.conf.Configuration hadoopConfig) throws IOException {
+
+		checkNotNull(flinkConfig, "flinkConfig");
+		checkNotNull(hadoopConfig, "hadoopConfig");
+
+		final HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(flinkConfig);
+		switch (mode) {
+			case NONE:
+				return new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig);
+
+			case ZOOKEEPER:
+				throw  new UnsupportedOperationException("to be implemented");
+
+			default:
+				throw new IllegalConfigurationException("Unrecognized high availability mode: " + mode);
+		}
+	}
+
+	/**
+	 * Creates the high-availability services for the TaskManagers participating in
+	 * a Flink YARN application.
+	 *
+	 * @param flinkConfig  The Flink configuration.
+	 * @param hadoopConfig The Hadoop configuration for the YARN cluster.
+	 *
+	 * @return The created high-availability services.
+	 *
+	 * @throws IOException Thrown, if the high-availability services could not be initialized.
+	 */
+	public static YarnHighAvailabilityServices forYarnTaskManager(
+			Configuration flinkConfig,
+			org.apache.hadoop.conf.Configuration hadoopConfig) throws IOException {
+
+		checkNotNull(flinkConfig, "flinkConfig");
+		checkNotNull(hadoopConfig, "hadoopConfig");
+
+		final HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(flinkConfig);
+		switch (mode) {
+			case NONE:
+				return new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+
+			case ZOOKEEPER:
+				throw  new UnsupportedOperationException("to be implemented");
+
+			default:
+				throw new IllegalConfigurationException("Unrecognized high availability mode: " + mode);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
new file mode 100644
index 0000000..fd1a45e
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.ServicesThreadFactory;
+import org.apache.flink.runtime.highavailability.leaderelection.SingleLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * These YarnHighAvailabilityServices are for the Application Master in setups where there is one
+ * ResourceManager that is statically configured in the Flink configuration.
+ * 
+ * <h3>Handled failure types</h3>
+ * <ul>
+ *     <li><b>User code & operator failures:</b> Failed operators are recovered from checkpoints.</li>
+ *     <li><b>Task Manager Failures:</b> Failed Task Managers are restarted and their tasks are
+ *         recovered from checkpoints.</li>
+ * </ul>
+ *
+ * <h3>Non-recoverable failure types</h3>
+ * <ul>
+ *     <li><b>Application Master failures:</b> These failures cannot be recovered, because TaskManagers
+ *     have no way to discover the new Application Master's address.</li>
+ * </ul>
+ *
+ * <p>Internally, these services put their recovery data into YARN's working directory,
+ * except for checkpoints, which are in the configured checkpoint directory. That way,
+ * checkpoints can be resumed with a new job/application, even if the complete YARN application
+ * is killed and cleaned up. 
+ *
+ * <p>Because ResourceManager and JobManager run both in the same process (Application Master), they
+ * use an embedded leader election service to find each other.
+ * 
+ * <p>A typical YARN setup that uses these HA services first starts the ResourceManager
+ * inside the ApplicationMaster and puts its RPC endpoint address into the configuration with which
+ * the TaskManagers are started. Because of this static addressing scheme, the setup cannot handle failures
+ * of the JobManager and ResourceManager, which are running as part of the Application Master.
+ *
+ * @see HighAvailabilityServices
+ */
+public class YarnIntraNonHaMasterServices extends AbstractYarnNonHaServices {
+
+	/** The dispatcher thread pool for these services */
+	private final ExecutorService dispatcher;
+
+	/** The embedded leader election service used by JobManagers to find the resource manager */
+	private final SingleLeaderElectionService resourceManagerLeaderElectionService;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates new YarnIntraNonHaMasterServices for the given Flink and YARN configuration.
+	 * 
+	 * This constructor initializes access to the HDFS to store recovery data, and creates the
+	 * embedded leader election services through which ResourceManager and JobManager find and
+	 * confirm each other.
+	 * 
+	 * @param config     The Flink configuration of this component / process.
+	 * @param hadoopConf The Hadoop configuration for the YARN cluster.
+	 *
+	 * @throws IOException
+	 *             Thrown, if the initialization of the Hadoop file system used by YARN fails.
+	 * @throws IllegalConfigurationException
+	 *             Thrown, if the Flink configuration does not properly describe the ResourceManager address and port.
+	 */
+	public YarnIntraNonHaMasterServices(
+			Configuration config,
+			org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
+
+		super(config, hadoopConf);
+
+		// track whether we successfully perform the initialization
+		boolean successful = false;
+
+		try {
+			this.dispatcher = Executors.newSingleThreadExecutor(new ServicesThreadFactory());
+			this.resourceManagerLeaderElectionService = new SingleLeaderElectionService(dispatcher, DEFAULT_LEADER_ID);
+
+			// all good!
+			successful = true;
+		}
+		finally {
+			if (!successful) {
+				// quietly undo what the parent constructor initialized
+				try {
+					super.close();
+				} catch (Throwable ignored) {}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Services
+	// ------------------------------------------------------------------------
+
+	@Override
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+		enter();
+		try {
+			return resourceManagerLeaderElectionService.createLeaderRetrievalService();
+		}
+		finally {
+			exit();
+		}
+	}
+
+	@Override
+	public LeaderElectionService getResourceManagerLeaderElectionService() {
+		enter();
+		try {
+			return resourceManagerLeaderElectionService;
+		}
+		finally {
+			exit();
+		}
+	}
+
+	@Override
+	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
+		enter();
+		try {
+			throw new UnsupportedOperationException("needs refactoring to accept default address");
+		}
+		finally {
+			exit();
+		}
+	}
+
+	@Override
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
+		enter();
+		try {
+			throw new UnsupportedOperationException("needs refactoring to accept default address");
+		}
+		finally {
+			exit();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  shutdown
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void close() throws Exception {
+		if (enterUnlessClosed()) {
+			try {
+				try {
+					// this class' own cleanup logic
+					resourceManagerLeaderElectionService.shutdown();
+					dispatcher.shutdownNow();
+				}
+				finally {
+					// in any case must we call the parent cleanup logic
+					super.close();
+				}
+			}
+			finally {
+				exit();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
new file mode 100644
index 0000000..eb4b77e
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import java.io.IOException;
+
+/**
+ * These YarnHighAvailabilityServices are for use by the TaskManager in setups,
+ * where there is one ResourceManager that is statically configured in the Flink configuration.
+ * 
+ * <h3>Handled failure types</h3>
+ * <ul>
+ *     <li><b>User code & operator failures:</b> Failed operators are recovered from checkpoints.</li>
+ *     <li><b>Task Manager Failures:</b> Failed Task Managers are restarted and their tasks are
+ *         recovered from checkpoints.</li>
+ * </ul>
+ *
+ * <h3>Non-recoverable failure types</h3>
+ * <ul>
+ *     <li><b>Application Master failures:</b> These failures cannot be recovered, because TaskManagers
+ *     have no way to discover the new Application Master's address.</li>
+ * </ul>
+ *
+ * <p>Internally, these services put their recovery data into YARN's working directory,
+ * except for checkpoints, which are in the configured checkpoint directory. That way,
+ * checkpoints can be resumed with a new job/application, even if the complete YARN application
+ * is killed and cleaned up. 
+ *
+ * <p>A typical YARN setup that uses these HA services first starts the ResourceManager
+ * inside the ApplicationMaster and puts its RPC endpoint address into the configuration with which
+ * the TaskManagers are started. Because of this static addressing scheme, the setup cannot handle failures
+ * of the JobManager and ResourceManager, which are running as part of the Application Master.
+ *
+ * @see HighAvailabilityServices
+ */
+public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServices {
+
+	/** The RPC URL under which the single ResourceManager can be reached while available */ 
+	private final String resourceManagerRpcUrl;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates new YarnPreConfiguredMasterHaServices for the given Flink and YARN configuration.
+	 * This constructor parses the ResourceManager address from the Flink configuration and sets
+	 * up the HDFS access to store recovery data in the YARN application's working directory.
+	 * 
+	 * @param config     The Flink configuration of this component / process.
+	 * @param hadoopConf The Hadoop configuration for the YARN cluster.
+	 *
+	 * @throws IOException
+	 *             Thrown, if the initialization of the Hadoop file system used by YARN fails.
+	 * @throws IllegalConfigurationException
+	 *             Thrown, if the Flink configuration does not properly describe the ResourceManager address and port.
+	 */
+	public YarnPreConfiguredMasterNonHaServices(
+			Configuration config,
+			org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
+
+		super(config, hadoopConf);
+
+		// track whether we successfully perform the initialization
+		boolean successful = false;
+
+		try {
+			// extract the hostname and port of the resource manager
+			final String rmHost = config.getString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS);
+			final int rmPort = config.getInteger(YarnConfigOptions.APP_MASTER_RPC_PORT);
+
+			if (rmHost == null) {
+				throw new IllegalConfigurationException("Config parameter '" + 
+						YarnConfigOptions.APP_MASTER_RPC_ADDRESS.key() + "' is missing.");
+			}
+			if (rmPort < 0) {
+				throw new IllegalConfigurationException("Config parameter '" +
+						YarnConfigOptions.APP_MASTER_RPC_PORT.key() + "' is missing.");
+			}
+			if (rmPort <= 0 || rmPort >= 65536) {
+				throw new IllegalConfigurationException("Invalid value for '" + 
+						YarnConfigOptions.APP_MASTER_RPC_PORT.key() + "' - port must be in [1, 65535]");
+			}
+
+			this.resourceManagerRpcUrl = RpcServiceUtils.getRpcUrl(
+					rmHost, rmPort, RESOURCE_MANAGER_RPC_ENDPOINT_NAME, config);
+
+			// all well!
+			successful = true;
+		}
+		finally {
+			if (!successful) {
+				// quietly undo what the parent constructor initialized
+				try {
+					super.close();
+				} catch (Throwable ignored) {}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Services
+	// ------------------------------------------------------------------------
+
+	@Override
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+		enter();
+		try {
+			return new StandaloneLeaderRetrievalService(resourceManagerRpcUrl, DEFAULT_LEADER_ID);
+		}
+		finally {
+			exit();
+		}
+	}
+
+	@Override
+	public LeaderElectionService getResourceManagerLeaderElectionService() {
+		enter();
+		try {
+			throw new UnsupportedOperationException("Not supported on the TaskManager side");
+		}
+		finally {
+			exit();
+		}
+	}
+
+	@Override
+	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
+		enter();
+		try {
+			throw new UnsupportedOperationException("needs refactoring to accept default address");
+		}
+		finally {
+			exit();
+		}
+	}
+
+	@Override
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
+		enter();
+		try {
+			throw new UnsupportedOperationException("needs refactoring to accept default address");
+		}
+		finally {
+			exit();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
new file mode 100644
index 0000000..0e7bf0f
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.File;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class YarnIntraNonHaMasterServicesTest {
+
+	private static final Random RND = new Random();
+
+	@ClassRule
+	public static final TemporaryFolder TEMP_DIR = new TemporaryFolder();
+
+	private static MiniDFSCluster HDFS_CLUSTER;
+
+	private static Path HDFS_ROOT_PATH;
+
+	private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+	// ------------------------------------------------------------------------
+	//  Test setup and shutdown
+	// ------------------------------------------------------------------------
+
+	@BeforeClass
+	public static void createHDFS() throws Exception {
+		final File tempDir = TEMP_DIR.newFolder();
+
+		org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
+		hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath());
+
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+		HDFS_CLUSTER = builder.build();
+		HDFS_ROOT_PATH = new Path(HDFS_CLUSTER.getURI());
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		if (HDFS_CLUSTER != null) {
+			HDFS_CLUSTER.shutdown();
+		}
+		HDFS_CLUSTER = null;
+		HDFS_ROOT_PATH = null;
+	}
+
+	@Before
+	public void initConfig() {
+		hadoopConfig = new org.apache.hadoop.conf.Configuration();
+		hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, HDFS_ROOT_PATH.toString());
+	}
+
+	// ------------------------------------------------------------------------
+	//  Tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testRepeatedClose() throws Exception {
+		final Configuration flinkConfig = new Configuration();
+
+		final YarnHighAvailabilityServices services = new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig);
+		services.closeAndCleanupAllData();
+
+		// this should not throw an exception
+		services.close();
+	}
+
+	@Test
+	public void testClosingReportsToLeader() throws Exception {
+		final Configuration flinkConfig = new Configuration();
+
+		try (YarnHighAvailabilityServices services = new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig)) {
+			final LeaderElectionService elector = services.getResourceManagerLeaderElectionService();
+			final LeaderContender contender = mockContender(elector);
+
+			elector.start(contender);
+			services.close();
+
+			verify(contender, timeout(100).times(1)).handleError(any(Exception.class));
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private static LeaderContender mockContender(final LeaderElectionService service) {
+		String address = StringUtils.getRandomString(RND, 5, 10, 'a', 'z');
+		return mockContender(service, address);
+	}
+
+	private static LeaderContender mockContender(final LeaderElectionService service, final String address) {
+		LeaderContender mockContender = mock(LeaderContender.class);
+
+		when(mockContender.getAddress()).thenReturn(address);
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) throws Throwable {
+				final UUID uuid = (UUID) invocation.getArguments()[0];
+				service.confirmLeaderSessionID(uuid);
+				return null;
+			}
+		}).when(mockContender).grantLeadership(any(UUID.class));
+
+		return mockContender;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
new file mode 100644
index 0000000..a13deac
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import static org.mockito.Mockito.*;
+
+
+public class YarnPreConfiguredMasterHaServicesTest extends TestLogger {
+
+	@ClassRule
+	public static final TemporaryFolder TEMP_DIR = new TemporaryFolder();
+
+	private static MiniDFSCluster HDFS_CLUSTER;
+
+	private static Path HDFS_ROOT_PATH;
+
+	private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+	// ------------------------------------------------------------------------
+	//  Test setup and shutdown
+	// ------------------------------------------------------------------------
+
+	@BeforeClass
+	public static void createHDFS() throws Exception {
+		final File tempDir = TEMP_DIR.newFolder();
+
+		org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
+		hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath());
+
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+		HDFS_CLUSTER = builder.build();
+		HDFS_ROOT_PATH = new Path(HDFS_CLUSTER.getURI());
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		if (HDFS_CLUSTER != null) {
+			HDFS_CLUSTER.shutdown();
+		}
+		HDFS_CLUSTER = null;
+		HDFS_ROOT_PATH = null;
+	}
+
+	@Before
+	public void initConfig() {
+		hadoopConfig = new org.apache.hadoop.conf.Configuration();
+		hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, HDFS_ROOT_PATH.toString());
+	}
+
+	// ------------------------------------------------------------------------
+	//  Tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testConstantResourceManagerName() throws Exception {
+		final Configuration flinkConfig = new Configuration();
+		flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, "localhost");
+		flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427);
+
+		YarnHighAvailabilityServices services1 = new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+		YarnHighAvailabilityServices services2 = new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+
+		try {
+			String rmName1 = services1.getResourceManagerEndpointName();
+			String rmName2 = services2.getResourceManagerEndpointName();
+
+			assertNotNull(rmName1);
+			assertNotNull(rmName2);
+			assertEquals(rmName1, rmName2);
+		}
+		finally {
+			services1.closeAndCleanupAllData();
+			services2.closeAndCleanupAllData();
+		}
+	}
+
+	@Test
+	public void testMissingRmConfiguration() throws Exception {
+		final Configuration flinkConfig = new Configuration();
+
+		// missing resource manager address
+		try {
+			new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+			fail();
+		} catch (IllegalConfigurationException e) {
+			// expected
+		}
+
+		flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, "localhost");
+
+		// missing resource manager port
+		try {
+			new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+			fail();
+		} catch (IllegalConfigurationException e) {
+			// expected
+		}
+
+		flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427);
+
+		// now everything is good ;-)
+		new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig).closeAndCleanupAllData();
+	}
+
+	@Test
+	public void testCloseAndCleanup() throws Exception {
+		final Configuration flinkConfig = new Configuration();
+		flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, "localhost");
+		flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427);
+
+		// create the services
+		YarnHighAvailabilityServices services = new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+		services.closeAndCleanupAllData();
+
+		final FileSystem fileSystem = HDFS_ROOT_PATH.getFileSystem();
+		final Path workDir = new Path(HDFS_CLUSTER.getFileSystem().getWorkingDirectory().toString());
+		
+		try {
+			fileSystem.getFileStatus(new Path(workDir, YarnHighAvailabilityServices.FLINK_RECOVERY_DATA_DIR));
+			fail("Flink recovery data directory still exists");
+		}
+		catch (FileNotFoundException e) {
+			// expected, because the directory should have been cleaned up
+		}
+
+		assertTrue(services.isClosed());
+
+		// doing another cleanup when the services are closed should fail
+		try {
+			services.closeAndCleanupAllData();
+			fail("should fail with an IllegalStateException");
+		} catch (IllegalStateException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testCallsOnClosedServices() throws Exception {
+		final Configuration flinkConfig = new Configuration();
+		flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, "localhost");
+		flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427);
+
+		YarnHighAvailabilityServices services = new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+
+		// this method is not supported
+		try {
+			services.getSubmittedJobGraphStore();
+			fail();
+		} catch (UnsupportedOperationException ignored) {}
+
+
+		services.close();
+
+		// all these methods should fail now
+
+		try {
+			services.createBlobStore();
+			fail();
+		} catch (IllegalStateException ignored) {}
+
+		try {
+			services.getCheckpointRecoveryFactory();
+			fail();
+		} catch (IllegalStateException ignored) {}
+
+		try {
+			services.getJobManagerLeaderElectionService(new JobID());
+			fail();
+		} catch (IllegalStateException ignored) {}
+
+		try {
+			services.getJobManagerLeaderRetriever(new JobID());
+			fail();
+		} catch (IllegalStateException ignored) {}
+
+		try {
+			services.getRunningJobsRegistry();
+			fail();
+		} catch (IllegalStateException ignored) {}
+
+		try {
+			services.getResourceManagerLeaderElectionService();
+			fail();
+		} catch (IllegalStateException ignored) {}
+
+		try {
+			services.getResourceManagerLeaderRetriever();
+			fail();
+		} catch (IllegalStateException ignored) {}
+	}
+}


[2/2] flink git commit: [FLINK-5254] [yarn] Implement YARN High-Availability Services

Posted by se...@apache.org.
[FLINK-5254] [yarn] Implement YARN High-Availability Services


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a2d4b36
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a2d4b36
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a2d4b36

Branch: refs/heads/flip-6
Commit: 1a2d4b36809bfa0bc6c993a4f4cb4eda34f82ee2
Parents: 8c448e8
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Dec 5 01:34:32 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 13 13:53:46 2016 +0100

----------------------------------------------------------------------
 .../flink/configuration/Configuration.java      |  12 +-
 .../FsNegativeRunningJobsRegistryTest.java      | 121 ++++++
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java |  20 +-
 .../highavailability/EmbeddedNonHaServices.java |  15 +-
 .../FsNegativeRunningJobsRegistry.java          | 153 ++++++++
 .../HighAvailabilityServices.java               |  69 +++-
 .../runtime/highavailability/NonHaServices.java |  21 +-
 .../highavailability/ServicesThreadFactory.java |  40 ++
 .../highavailability/ZookeeperHaServices.java   |  17 +-
 .../SingleLeaderElectionService.java            | 384 +++++++++++++++++++
 .../nonha/AbstractNonHaServices.java            |  29 +-
 .../nonha/EmbeddedLeaderService.java            |   2 +-
 .../StandaloneLeaderRetrievalService.java       |   1 +
 .../flink/runtime/minicluster/MiniCluster.java  |   2 +-
 .../resourcemanager/JobLeaderIdService.java     |   2 +-
 .../resourcemanager/ResourceManagerRunner.java  |   3 +-
 .../flink/runtime/rpc/RpcServiceUtils.java      |  70 ++++
 .../FsNegativeRunningJobsRegistryTest.java      |  85 ++++
 .../TestingHighAvailabilityServices.java        |  14 +-
 .../SingleLeaderElectionServiceTest.java        | 226 +++++++++++
 flink-yarn/pom.xml                              |  53 ++-
 .../flink/yarn/YarnApplicationMasterRunner.java |   2 +-
 .../yarn/YarnFlinkApplicationMasterRunner.java  |  18 +-
 .../flink/yarn/YarnTaskExecutorRunner.java      |   2 +-
 .../yarn/configuration/YarnConfigOptions.java   |  49 +++
 .../AbstractYarnNonHaServices.java              | 105 +++++
 .../YarnHighAvailabilityServices.java           | 343 +++++++++++++++++
 .../YarnIntraNonHaMasterServices.java           | 188 +++++++++
 .../YarnPreConfiguredMasterNonHaServices.java   | 172 +++++++++
 .../YarnIntraNonHaMasterServicesTest.java       | 149 +++++++
 .../YarnPreConfiguredMasterHaServicesTest.java  | 234 +++++++++++
 31 files changed, 2518 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index f15c669..8f23435 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -44,7 +44,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 		implements IOReadableWritable, java.io.Serializable, Cloneable {
 
 	private static final long serialVersionUID = 1L;
-	
+
 	private static final byte TYPE_STRING = 0;
 	private static final byte TYPE_INT = 1;
 	private static final byte TYPE_LONG = 2;
@@ -52,14 +52,14 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	private static final byte TYPE_FLOAT = 4;
 	private static final byte TYPE_DOUBLE = 5;
 	private static final byte TYPE_BYTES = 6;
-	
+
 	/** The log object used for debugging. */
 	private static final Logger LOG = LoggerFactory.getLogger(Configuration.class);
-	
+
 
 	/** Stores the concrete key/value pairs of this configuration object. */
 	protected final HashMap<String, Object> confData;
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -639,12 +639,16 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 		Object o = getRawValue(configOption.key());
 
 		if (o != null) {
+			// found a value for the current proper key
 			return o;
 		}
 		else if (configOption.hasDeprecatedKeys()) {
+			// try the deprecated keys
 			for (String deprecatedKey : configOption.deprecatedKeys()) {
 				Object oo = getRawValue(deprecatedKey);
 				if (oo != null) {
+					LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'", 
+							deprecatedKey, configOption.key());
 					return oo;
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
new file mode 100644
index 0000000..40d75e8
--- /dev/null
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hdfstests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FsNegativeRunningJobsRegistryTest {
+
+	@ClassRule
+	public static final TemporaryFolder TEMP_DIR = new TemporaryFolder();
+
+	private static MiniDFSCluster HDFS_CLUSTER;
+
+	private static Path HDFS_ROOT_PATH;
+
+	// ------------------------------------------------------------------------
+	//  startup / shutdown
+	// ------------------------------------------------------------------------
+
+	@BeforeClass
+	public static void createHDFS() throws Exception {
+		final File tempDir = TEMP_DIR.newFolder();
+
+		Configuration hdConf = new Configuration();
+		hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath());
+
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+		HDFS_CLUSTER = builder.build();
+
+		HDFS_ROOT_PATH = new Path("hdfs://" + HDFS_CLUSTER.getURI().getHost() + ":"
+				+ HDFS_CLUSTER.getNameNodePort() + "/");
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		if (HDFS_CLUSTER != null) {
+			HDFS_CLUSTER.shutdown();
+		}
+		HDFS_CLUSTER = null;
+		HDFS_ROOT_PATH = null;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Tests
+	// ------------------------------------------------------------------------
+	
+	@Test
+	public void testCreateAndSetFinished() throws Exception {
+		final Path workDir = new Path(HDFS_ROOT_PATH, "test-work-dir");
+		final JobID jid = new JobID();
+
+		FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(workDir);
+
+		// initially, without any call, the job is considered running
+		assertTrue(registry.isJobRunning(jid));
+
+		// repeated setting should not affect the status
+		registry.setJobRunning(jid);
+		assertTrue(registry.isJobRunning(jid));
+
+		// set the job to finished and validate
+		registry.setJobFinished(jid);
+		assertFalse(registry.isJobRunning(jid));
+
+		// another registry should pick this up
+		FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir);
+		assertFalse(otherRegistry.isJobRunning(jid));
+	}
+
+	@Test
+	public void testSetFinishedAndRunning() throws Exception {
+		final Path workDir = new Path(HDFS_ROOT_PATH, "�nother_w�rk_direct�r�");
+		final JobID jid = new JobID();
+
+		FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(workDir);
+
+		// set the job to finished and validate
+		registry.setJobFinished(jid);
+		assertFalse(registry.isJobRunning(jid));
+
+		// set the job to back to running and validate
+		registry.setJobRunning(jid);
+		assertTrue(registry.isJobRunning(jid));
+
+		// another registry should pick this up
+		FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir);
+		assertTrue(otherRegistry.isJobRunning(jid));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 5d7173b..8024c54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -37,6 +37,8 @@ import org.apache.flink.util.InstantiationUtil;
 
 import org.apache.hadoop.conf.Configuration;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Concrete implementation of the {@link FileSystem} base class for the Hadoop File System. The
  * class is a wrapper class which encapsulated the original Hadoop HDFS API.
@@ -62,7 +64,8 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 
 
 	/**
-	 * Creates a new DistributedFileSystem object to access HDFS
+	 * Creates a new DistributedFileSystem object to access HDFS, based on a class name
+	 * and picking up the configuration from the class path or the Flink configuration.
 	 * 
 	 * @throws IOException
 	 *         throw if the required HDFS classes cannot be instantiated
@@ -78,6 +81,21 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 		this.fs = instantiateFileSystem(fsClass);
 	}
 
+	/**
+	 * Creates a new DistributedFileSystem that uses the given Hadoop
+	 * {@link org.apache.hadoop.fs.FileSystem} under the hood.
+	 *
+	 * @param hadoopConfig The Hadoop configuration that the FileSystem is based on.
+	 * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
+	 */
+	public HadoopFileSystem(
+			org.apache.hadoop.conf.Configuration hadoopConfig,
+			org.apache.hadoop.fs.FileSystem hadoopFileSystem) {
+
+		this.conf = checkNotNull(hadoopConfig, "hadoopConfig");
+		this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
+	}
+
 	private Class<? extends org.apache.hadoop.fs.FileSystem> getDefaultHDFSClass() throws IOException {
 		Class<? extends org.apache.hadoop.fs.FileSystem> fsClass = null;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
index b91cec1..a417599 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -43,6 +43,12 @@ public class EmbeddedNonHaServices extends AbstractNonHaServices implements High
 	// ------------------------------------------------------------------------
 
 	@Override
+	public String getResourceManagerEndpointName() {
+		// dynamic actor name
+		return null;
+	}
+
+	@Override
 	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
 		return resourceManagerLeaderService.createLeaderRetrievalService();
 	}
@@ -55,11 +61,16 @@ public class EmbeddedNonHaServices extends AbstractNonHaServices implements High
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void shutdown() throws Exception {
+	public void close() throws Exception {
 		try {
-			super.shutdown();
+			super.close();
 		} finally {
 			resourceManagerLeaderService.shutdown();
 		}
 	}
+
+	@Override
+	public void closeAndCleanupAllData() throws Exception {
+		close();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
new file mode 100644
index 0000000..9d8b226
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This {@link RunningJobsRegistry} tracks the status jobs via marker files,
+ * marking finished jobs via marker files.
+ * 
+ * <p>The general contract is the following:
+ * <ul>
+ *     <li>Initially, a marker file does not exist (no one created it, yet), which means
+ *         the specific job is assumed to be running</li>
+ *     <li>The JobManager that finishes calls this service to create the marker file,
+ *         which marks the job as finished.</li>
+ *     <li>If a JobManager gains leadership at some point when shutdown is in progress,
+ *         it will see the marker file and realize that the job is finished.</li>
+ *     <li>The application framework is expected to clean the file once the application
+ *         is completely shut down. At that point, no JobManager will attempt to
+ *         start the job, even if it gains leadership.</li>
+ * </ul>
+ * 
+ * <p>It is especially tailored towards deployment modes like for example
+ * YARN, where HDFS is available as a persistent file system, and the YARN
+ * application's working directories on HDFS are automatically cleaned
+ * up after the application completed. 
+ */
+public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry {
+
+	private static final String PREFIX = ".job_complete_";
+
+	private final FileSystem fileSystem;
+
+	private final Path basePath;
+
+	/**
+	 * Creates a new registry that writes to the FileSystem and working directory
+	 * denoted by the given path.
+	 * 
+	 * <p>The initialization will attempt to write to the given working directory, in
+	 * order to catch setup/configuration errors early.
+	 *
+	 * @param workingDirectory The working directory for files to track the job status.
+	 *
+	 * @throws IOException Thrown, if the specified directory cannot be accessed.
+	 */
+	public FsNegativeRunningJobsRegistry(Path workingDirectory) throws IOException {
+		this(workingDirectory.getFileSystem(), workingDirectory);
+	}
+
+	/**
+	 * Creates a new registry that writes its files to the given FileSystem at
+	 * the given working directory path.
+	 * 
+	 * <p>The initialization will attempt to write to the given working directory, in
+	 * order to catch setup/configuration errors early.
+	 *
+	 * @param fileSystem The FileSystem to use for the marker files.
+	 * @param workingDirectory The working directory for files to track the job status.
+	 *
+	 * @throws IOException Thrown, if the specified directory cannot be accessed.
+	 */
+	public FsNegativeRunningJobsRegistry(FileSystem fileSystem, Path workingDirectory) throws IOException {
+		this.fileSystem = checkNotNull(fileSystem, "fileSystem");
+		this.basePath = checkNotNull(workingDirectory, "workingDirectory");
+
+		// to be safe, attempt to write to the working directory, to
+		// catch problems early
+		final Path testFile = new Path(workingDirectory, ".registry_test");
+		try (FSDataOutputStream out = fileSystem.create(testFile, false)) {
+			out.write(42);
+		}
+		catch (IOException e) {
+			throw new IOException("Unable to write to working directory: " + workingDirectory, e);
+		}
+		finally {
+			fileSystem.delete(testFile, false);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void setJobRunning(JobID jobID) throws IOException {
+		checkNotNull(jobID, "jobID");
+		final Path filePath = createMarkerFilePath(jobID);
+
+		// delete the marker file, if it exists
+		try {
+			fileSystem.delete(filePath, false);
+		}
+		catch (FileNotFoundException e) {
+			// apparently job was already considered running
+		}
+	}
+
+	@Override
+	public void setJobFinished(JobID jobID) throws IOException {
+		checkNotNull(jobID, "jobID");
+		final Path filePath = createMarkerFilePath(jobID);
+
+		// create the file
+		// to avoid an exception if the job already exists, set overwrite=true
+		try (FSDataOutputStream out = fileSystem.create(filePath, true)) {
+			out.write(42);
+		}
+	}
+
+	@Override
+	public boolean isJobRunning(JobID jobID) throws IOException {
+		checkNotNull(jobID, "jobID");
+
+		// check for the existence of the file
+		try {
+			fileSystem.getFileStatus(createMarkerFilePath(jobID));
+			// file was found --> job is terminated
+			return false;
+		}
+		catch (FileNotFoundException e) {
+			// file does not exist, job is still running
+			return true;
+		}
+	}
+
+	private Path createMarkerFilePath(JobID jobId) {
+		return new Path(basePath, PREFIX + jobId.toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 360de7b..4169204 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -26,19 +26,45 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
 import java.io.IOException;
+import java.util.UUID;
 
 /**
- * This class gives access to all services needed for
- *
+ * The HighAvailabilityServices give access to all services needed for a highly-available
+ * setup. In particular, the services provide access to highly available storage and
+ * registries, as well as distributed counters and leader election.
+ * 
  * <ul>
  *     <li>ResourceManager leader election and leader retrieval</li>
  *     <li>JobManager leader election and leader retrieval</li>
  *     <li>Persistence for checkpoint metadata</li>
  *     <li>Registering the latest completed checkpoint(s)</li>
- *     <li>Persistence for submitted job graph</li>
+ *     <li>Persistence for the BLOB store</li>
+ *     <li>Registry that marks a job's status</li>
+ *     <li>Naming of RPC endpoints</li>
  * </ul>
  */
-public interface HighAvailabilityServices {
+public interface HighAvailabilityServices extends AutoCloseable {
+
+	// ------------------------------------------------------------------------
+	//  Constants
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This UUID should be used when no proper leader election happens, but a simple
+	 * pre-configured leader is used. That is for example the case in non-highly-available
+	 * standalone setups.
+	 */
+	UUID DEFAULT_LEADER_ID = new UUID(0, 0);
+
+	// ------------------------------------------------------------------------
+	//  Endpoint Naming
+	// ------------------------------------------------------------------------
+
+	String getResourceManagerEndpointName();
+
+	// ------------------------------------------------------------------------
+	//  Services
+	// ------------------------------------------------------------------------
 
 	/**
 	 * Gets the leader retriever for the cluster's resource manager.
@@ -88,7 +114,7 @@ public interface HighAvailabilityServices {
 	 *
 	 * @return Running job registry to retrieve running jobs
 	 */
-	RunningJobsRegistry getRunningJobsRegistry();
+	RunningJobsRegistry getRunningJobsRegistry() throws Exception;
 
 	/**
 	 * Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
@@ -99,11 +125,38 @@ public interface HighAvailabilityServices {
 	BlobStore createBlobStore() throws IOException;
 
 	// ------------------------------------------------------------------------
+	//  Shutdown and Cleanup
+	// ------------------------------------------------------------------------
 
 	/**
-	 * Shut the high availability service down.
+	 * Closes the high availability services, releasing all resources.
+	 * 
+	 * <p>This method <b>does not delete or clean up</b> any data stored in external stores
+	 * (file systems, ZooKeeper, etc). Another instance of the high availability
+	 * services will be able to recover the job.
+	 * 
+	 * <p>If an exception occurs during closing services, this method will attempt to
+	 * continue closing other services and report exceptions only after all services
+	 * have been attempted to be closed.
 	 *
-	 * @throws Exception if the shut down fails
+	 * @throws Exception Thrown, if an exception occurred while closing these services.
+	 */
+	@Override
+	void close() throws Exception;
+
+	/**
+	 * Closes the high availability services (releasing all resources) and deletes
+	 * all data stored by these services in external stores.
+	 * 
+	 * <p>After this method was called, the any job or session that was managed by
+	 * these high availability services will be unrecoverable.
+	 * 
+	 * <p>If an exception occurs during cleanup, this method will attempt to
+	 * continue the cleanup and report exceptions only after all cleanup steps have
+	 * been attempted.
+	 * 
+	 * @throws Exception Thrown, if an exception occurred while closing these services
+	 *                   or cleaning up data stored by them.
 	 */
-	void shutdown() throws Exception;
+	void closeAndCleanupAllData() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 75f44ed..d644fb9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -18,14 +18,12 @@
 
 package org.apache.flink.runtime.highavailability;
 
+import org.apache.flink.runtime.highavailability.leaderelection.SingleLeaderElectionService;
 import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 
-import java.util.UUID;
-
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -39,6 +37,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class NonHaServices extends AbstractNonHaServices implements HighAvailabilityServices {
 
+	/** The constant name of the ResourceManager RPC endpoint */
+	private static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager";
+
 	/** The fix address of the ResourceManager */
 	private final String resourceManagerAddress;
 
@@ -53,16 +54,26 @@ public class NonHaServices extends AbstractNonHaServices implements HighAvailabi
 	}
 
 	// ------------------------------------------------------------------------
+	//  Names
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String getResourceManagerEndpointName() {
+		return RESOURCE_MANAGER_RPC_ENDPOINT_NAME;
+	}
+
+
+	// ------------------------------------------------------------------------
 	//  Services
 	// ------------------------------------------------------------------------
 
 	@Override
 	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
-		return new StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0));
+		return new StandaloneLeaderRetrievalService(resourceManagerAddress, DEFAULT_LEADER_ID);
 	}
 
 	@Override
 	public LeaderElectionService getResourceManagerLeaderElectionService() {
-		return new StandaloneLeaderElectionService();
+		return new SingleLeaderElectionService(getExecutorService(), DEFAULT_LEADER_ID);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java
new file mode 100644
index 0000000..24667e4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import javax.annotation.Nonnull;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ServicesThreadFactory implements ThreadFactory {
+
+	private AtomicInteger enumerator = new AtomicInteger();
+
+	@Override
+	public Thread newThread(@Nonnull Runnable r) {
+		Thread thread = new Thread(r, "Flink HA Services Thread #" + enumerator.incrementAndGet());
+
+		// HA threads should have a very high priority, but not
+		// keep the JVM running by themselves
+		thread.setPriority(Thread.MAX_PRIORITY);
+		thread.setDaemon(true);
+
+		return thread;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index 07c5011..bf0b970 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -107,6 +107,16 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 	// ------------------------------------------------------------------------
 
 	@Override
+	public String getResourceManagerEndpointName() {
+		// since the resource manager name must be dynamic, we return null here
+		return null;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Services
+	// ------------------------------------------------------------------------
+
+	@Override
 	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
 		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
 	}
@@ -173,10 +183,15 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void shutdown() throws Exception {
+	public void close() throws Exception {
 		client.close();
 	}
 
+	@Override
+	public void closeAndCleanupAllData() throws Exception {
+		close();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
new file mode 100644
index 0000000..26e3cbf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.leaderelection;
+
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import java.util.HashSet;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of the {@link LeaderElectionService} interface that handles a single
+ * leader contender. When started, this service immediately grants the contender the leadership.
+ * 
+ * <p>The implementation accepts a single static leader session ID and is hence compatible with
+ * pre-configured single leader (no leader failover) setups.
+ * 
+ * <p>This implementation supports a series of leader listeners that receive notifications about
+ * the leader contender.
+ */
+public class SingleLeaderElectionService implements LeaderElectionService {
+
+	private static final Logger LOG = LoggerFactory.getLogger(SingleLeaderElectionService.class);
+
+	// ------------------------------------------------------------------------
+
+	/** lock for all operations on this instance */
+	private final Object lock = new Object();
+
+	/** The executor service that dispatches notifications */
+	private final Executor notificationExecutor;
+
+	/** The leader ID assigned to the immediate leader */
+	private final UUID leaderId;
+
+	@GuardedBy("lock")
+	private final HashSet<EmbeddedLeaderRetrievalService> listeners;
+
+	/** The currently proposed leader */
+	@GuardedBy("lock")
+	private volatile LeaderContender proposedLeader;
+
+	/** The confirmed leader */
+	@GuardedBy("lock")
+	private volatile LeaderContender leader;
+
+	/** The address of the confirmed leader */
+	@GuardedBy("lock")
+	private volatile String leaderAddress;
+
+	/** Flag marking this service as shutdown, meaning it cannot be started again */
+	@GuardedBy("lock")
+	private volatile boolean shutdown;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new leader election service. The service assigns the given leader ID
+	 * to the leader contender.
+	 * 
+	 * @param leaderId The constant leader ID assigned to the leader.
+	 */
+	public SingleLeaderElectionService(Executor notificationsDispatcher, UUID leaderId) {
+		this.notificationExecutor = checkNotNull(notificationsDispatcher);
+		this.leaderId = checkNotNull(leaderId);
+		this.listeners = new HashSet<>();
+	}
+
+	// ------------------------------------------------------------------------
+	//  leader election service
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void start(LeaderContender contender) throws Exception {
+		checkNotNull(contender, "contender");
+
+		synchronized (lock) {
+			checkState(!shutdown, "service is shut down");
+			checkState(proposedLeader == null, "service already started");
+
+			// directly grant leadership to the given contender
+			proposedLeader = contender;
+			notificationExecutor.execute(new GrantLeadershipCall(contender, leaderId));
+		}
+	}
+
+	@Override
+	public void stop() {
+		synchronized (lock) {
+			// notify all listeners that there is no leader
+			for (EmbeddedLeaderRetrievalService listener : listeners) {
+				notificationExecutor.execute(
+						new NotifyOfLeaderCall(null, null, listener.listener, LOG));
+			}
+
+			// if there was a leader, revoke its leadership
+			if (leader != null) {
+				try {
+					leader.revokeLeadership();
+				} catch (Throwable t) {
+					leader.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+				}
+			}
+
+			proposedLeader = null;
+			leader = null;
+			leaderAddress = null;
+		}
+	}
+
+	@Override
+	public void confirmLeaderSessionID(UUID leaderSessionID) {
+		checkNotNull(leaderSessionID, "leaderSessionID");
+		checkArgument(leaderSessionID.equals(leaderId), "confirmed wrong leader session id");
+
+		synchronized (lock) {
+			checkState(!shutdown, "service is shut down");
+			checkState(proposedLeader != null, "no leader proposed yet");
+			checkState(leader == null, "leader already confirmed");
+
+			// accept the confirmation
+			final String address = proposedLeader.getAddress();
+			leaderAddress = address;
+			leader = proposedLeader;
+
+			// notify all listeners
+			for (EmbeddedLeaderRetrievalService listener : listeners) {
+				notificationExecutor.execute(
+						new NotifyOfLeaderCall(address, leaderId, listener.listener, LOG));
+			}
+		}
+	}
+
+	@Override
+	public boolean hasLeadership() {
+		synchronized (lock) {
+			return leader != null;
+		}
+	}
+
+	void errorOnGrantLeadership(LeaderContender contender, Throwable error) {
+		LOG.warn("Error notifying leader listener about new leader", error);
+		contender.handleError(error instanceof Exception ? (Exception) error : new Exception(error));
+		
+		synchronized (lock) {
+			if (proposedLeader == contender) {
+				proposedLeader = null;
+				leader = null;
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  shutdown
+	// ------------------------------------------------------------------------
+
+	public boolean isShutdown() {
+		return shutdown;
+	}
+
+	public void shutdown() {
+		shutdownInternally(new Exception("The leader service is shutting down"));
+	}
+
+	private void shutdownInternally(Exception exceptionForHandlers) {
+		synchronized (lock) {
+			if (shutdown) {
+				return;
+			}
+
+			shutdown = true;
+
+			// fail the leader (if there is one)
+			if (leader != null) {
+				try {
+					leader.handleError(exceptionForHandlers);
+				} catch (Throwable ignored) {}
+			}
+
+			// clear all leader status
+			leader = null;
+			proposedLeader = null;
+			leaderAddress = null;
+
+			// fail all registered listeners
+			for (EmbeddedLeaderRetrievalService service : listeners) {
+				service.shutdown(exceptionForHandlers);
+			}
+			listeners.clear();
+		}
+	}
+
+	private void fatalError(Throwable error) {
+		LOG.error("Embedded leader election service encountered a fatal error. Shutting down service.", error);
+
+		shutdownInternally(new Exception("Leader election service is shutting down after a fatal error", error));
+	}
+
+	// ------------------------------------------------------------------------
+	//  leader listeners
+	// ------------------------------------------------------------------------
+
+	public LeaderRetrievalService createLeaderRetrievalService() {
+		checkState(!shutdown, "leader election service is shut down");
+		return new EmbeddedLeaderRetrievalService();
+	}
+
+	void addListener(EmbeddedLeaderRetrievalService service, LeaderRetrievalListener listener) {
+		synchronized (lock) {
+			checkState(!shutdown, "leader election service is shut down");
+			checkState(!service.running, "leader retrieval service is already started");
+
+			try {
+				if (!listeners.add(service)) {
+					throw new IllegalStateException("leader retrieval service was added to this service multiple times");
+				}
+
+				service.listener = listener;
+				service.running = true;
+
+				// if we already have a leader, immediately notify this new listener
+				if (leader != null) {
+					notificationExecutor.execute(
+							new NotifyOfLeaderCall(leaderAddress, leaderId, listener, LOG));
+				}
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	void removeListener(EmbeddedLeaderRetrievalService service) {
+		synchronized (lock) {
+			// if the service was not even started, simply do nothing
+			if (!service.running || shutdown) {
+				return;
+			}
+
+			try {
+				if (!listeners.remove(service)) {
+					throw new IllegalStateException("leader retrieval service does not belong to this service");
+				}
+
+				// stop the service
+				service.listener = null;
+				service.running = false;
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private class EmbeddedLeaderRetrievalService implements LeaderRetrievalService {
+
+		volatile LeaderRetrievalListener listener;
+
+		volatile boolean running;
+
+		@Override
+		public void start(LeaderRetrievalListener listener) throws Exception {
+			checkNotNull(listener);
+			addListener(this, listener);
+		}
+
+		@Override
+		public void stop() throws Exception {
+			removeListener(this);
+		}
+
+		void shutdown(Exception cause) {
+			if (running) {
+				final LeaderRetrievalListener lst = listener;
+				running = false;
+				listener = null;
+
+				try {
+					lst.handleError(cause);
+				} catch (Throwable ignored) {}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  asynchronous notifications
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This runnable informs a leader contender that it gained leadership.
+	 */
+	private class GrantLeadershipCall implements Runnable {
+
+		private final LeaderContender contender;
+		private final UUID leaderSessionId;
+
+		GrantLeadershipCall(LeaderContender contender, UUID leaderSessionId) {
+
+			this.contender = checkNotNull(contender);
+			this.leaderSessionId = checkNotNull(leaderSessionId);
+		}
+
+		@Override
+		public void run() {
+			try {
+				contender.grantLeadership(leaderSessionId);
+			}
+			catch (Throwable t) {
+				errorOnGrantLeadership(contender, t);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This runnable informs a leader listener of a new leader
+	 */
+	private static class NotifyOfLeaderCall implements Runnable {
+
+		@Nullable
+		private final String address;       // null if leader revoked without new leader
+		@Nullable
+		private final UUID leaderSessionId; // null if leader revoked without new leader
+
+		private final LeaderRetrievalListener listener;
+		private final Logger logger;
+
+		NotifyOfLeaderCall(
+				@Nullable String address,
+				@Nullable UUID leaderSessionId,
+				LeaderRetrievalListener listener,
+				Logger logger) {
+
+			this.address = address;
+			this.leaderSessionId = leaderSessionId;
+			this.listener = checkNotNull(listener);
+			this.logger = checkNotNull(logger);
+		}
+
+		@Override
+		public void run() {
+			try {
+				listener.notifyLeaderAddress(address, leaderSessionId);
+			}
+			catch (Throwable t) {
+				logger.warn("Error notifying leader listener about new leader", t);
+				listener.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+			}
+		}
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
index 474faa8..b10e414 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -25,19 +25,17 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.highavailability.ServicesThreadFactory;
 import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
-import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -132,7 +130,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void shutdown() throws Exception {
+	public void close() throws Exception {
 		synchronized (lock) {
 			if (!shutdown) {
 				shutdown = true;
@@ -149,6 +147,12 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 		}
 	}
 
+	@Override
+	public void closeAndCleanupAllData() throws Exception {
+		// this stores no data, so this method is the same as 'close()'
+		close();
+	}
+
 	private void checkNotShutdown() {
 		checkState(!shutdown, "high availability services are shut down");
 	}
@@ -160,21 +164,4 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 	protected ExecutorService getExecutorService() {
 		return executor;
 	}
-
-	private static final class ServicesThreadFactory implements ThreadFactory {
-
-		private AtomicInteger enumerator = new AtomicInteger();
-
-		@Override
-		public Thread newThread(@Nonnull Runnable r) {
-			Thread thread = new Thread(r, "Flink HA Services Thread #" + enumerator.incrementAndGet());
-
-			// HA threads should have a very high priority, but not
-			// keep the JVM running by themselves
-			thread.setPriority(Thread.MAX_PRIORITY);
-			thread.setDaemon(true);
-
-			return thread;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
index 9fad9be..d4eba26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
@@ -461,7 +461,7 @@ public class EmbeddedLeaderService {
 				contender.grantLeadership(leaderSessionId);
 			}
 			catch (Throwable t) {
-				logger.warn("Error notifying leader listener about new leader", t);
+				logger.warn("Error granting leadership to contender", t);
 				contender.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
index 16b163c..4ad4646 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
@@ -51,6 +51,7 @@ public class StandaloneLeaderRetrievalService implements LeaderRetrievalService
 	 *
 	 * @param leaderAddress The leader's pre-configured address
 	 */
+	@Deprecated
 	public StandaloneLeaderRetrievalService(String leaderAddress) {
 		this.leaderAddress = checkNotNull(leaderAddress);
 		this.leaderId = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 29a6e59..1933554 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -345,7 +345,7 @@ public class MiniCluster {
 		// shut down high-availability services
 		if (haServices != null) {
 			try {
-				haServices.shutdown();
+				haServices.closeAndCleanupAllData();
 			} catch (Exception e) {
 				exception = firstOrSuppressed(e, exception);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index 56e72c0..6c7e249 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -59,7 +59,7 @@ public class JobLeaderIdService {
 	/** Actions to call when the job leader changes */
 	private JobLeaderIdActions jobLeaderIdActions;
 
-	public JobLeaderIdService(HighAvailabilityServices highAvailabilityServices) {
+	public JobLeaderIdService(HighAvailabilityServices highAvailabilityServices) throws Exception {
 		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
 
 		this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index 959b727..e0dee0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -46,7 +45,7 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 			final Configuration configuration,
 			final RpcService rpcService,
 			final HighAvailabilityServices highAvailabilityServices,
-			final MetricRegistry metricRegistry) throws ConfigurationException {
+			final MetricRegistry metricRegistry) throws Exception {
 
 		Preconditions.checkNotNull(configuration);
 		Preconditions.checkNotNull(rpcService);

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
index 1ac54ac..018c3ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
@@ -21,19 +21,36 @@ package org.apache.flink.runtime.rpc;
 import akka.actor.ActorSystem;
 import com.typesafe.config.Config;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.util.NetUtils;
+
 import org.jboss.netty.channel.ChannelException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.UnknownHostException;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * These RPC utilities contain helper methods around RPC use, such as starting an RPC service,
+ * or constructing RPC addresses.
+ */
 public class RpcServiceUtils {
+
 	private static final Logger LOG = LoggerFactory.getLogger(RpcServiceUtils.class);
 
+	// ------------------------------------------------------------------------
+	//  RPC instantiation
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Utility method to create RPC service from configuration and hostname, port.
 	 *
@@ -78,4 +95,57 @@ public class RpcServiceUtils {
 		final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
 		return new AkkaRpcService(actorSystem, timeout);
 	}
+
+	// ------------------------------------------------------------------------
+	//  RPC endpoint addressing
+	// ------------------------------------------------------------------------
+
+	/**
+	 *
+	 * @param hostname     The hostname or address where the target RPC service is listening.
+	 * @param port         The port where the target RPC service is listening.
+	 * @param endpointName The name of the RPC endpoint.
+	 * @param config       Teh configuration from which to deduce further settings.
+	 *
+	 * @return The RPC URL of the specified RPC endpoint.
+	 */
+	public static String getRpcUrl(String hostname, int port, String endpointName, Configuration config)
+			throws UnknownHostException {
+
+		checkNotNull(config, "config is null");
+
+		final boolean sslEnabled = config.getBoolean(
+					ConfigConstants.AKKA_SSL_ENABLED,
+					ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) &&
+				SSLUtils.getSSLEnabled(config);
+
+		return getRpcUrl(hostname, port, endpointName, sslEnabled);
+	}
+
+	/**
+	 * 
+	 * @param hostname     The hostname or address where the target RPC service is listening.
+	 * @param port         The port where the target RPC service is listening.
+	 * @param endpointName The name of the RPC endpoint.
+	 * @param secure       True, if security/encryption is enabled, false otherwise.
+	 * 
+	 * @return The RPC URL of the specified RPC endpoint.
+	 */
+	public static String getRpcUrl(String hostname, int port, String endpointName, boolean secure)
+			throws UnknownHostException {
+
+		checkNotNull(hostname, "hostname is null");
+		checkNotNull(endpointName, "endpointName is null");
+		checkArgument(port > 0 && port <= 65535, "port must be in [1, 65535]");
+
+		final String protocol = secure ? "akka.ssl.tcp" : "akka.tcp";
+		final String hostPort = NetUtils.hostAndPortToUrlString(hostname, port);
+
+		return String.format("%s://flink@%s/user/%s", protocol, hostPort, endpointName);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** This class is not meant to be instantiated */
+	private RpcServiceUtils() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
new file mode 100644
index 0000000..f1ece0e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FsNegativeRunningJobsRegistryTest extends TestLogger {
+
+	@Rule
+	public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void testCreateAndSetFinished() throws Exception {
+		final File folder = tempFolder.newFolder();
+		final String uri = folder.toURI().toString();
+
+		final JobID jid = new JobID();
+
+		FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(new Path(uri));
+
+		// initially, without any call, the job is considered running
+		assertTrue(registry.isJobRunning(jid));
+
+		// repeated setting should not affect the status
+		registry.setJobRunning(jid);
+		assertTrue(registry.isJobRunning(jid));
+
+		// set the job to finished and validate
+		registry.setJobFinished(jid);
+		assertFalse(registry.isJobRunning(jid));
+
+		// another registry should pick this up
+		FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri));
+		assertFalse(otherRegistry.isJobRunning(jid));
+	}
+
+	@Test
+	public void testSetFinishedAndRunning() throws Exception {
+		final File folder = tempFolder.newFolder();
+		final String uri = folder.toURI().toString();
+
+		final JobID jid = new JobID();
+
+		FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(new Path(uri));
+
+		// set the job to finished and validate
+		registry.setJobFinished(jid);
+		assertFalse(registry.isJobRunning(jid));
+
+		// set the job to back to running and validate
+		registry.setJobRunning(jid);
+		assertTrue(registry.isJobRunning(jid));
+
+		// another registry should pick this up
+		FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri));
+		assertTrue(otherRegistry.isJobRunning(jid));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index e0f71ee..3f9865c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -155,12 +155,22 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 		return new VoidBlobStore();
 	}
 
+	@Override
+	public String getResourceManagerEndpointName() {
+		throw new UnsupportedOperationException();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Shutdown
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void shutdown() throws Exception {
-		// nothing to do, since this should not shut down individual services, but cross service parts
+	public void close() throws Exception {
+		// nothing to do
+	}
+
+	@Override
+	public void closeAndCleanupAllData() throws Exception {
+		// nothing to do
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java
new file mode 100644
index 0000000..a9805a1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.leaderelection;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the {@link SingleLeaderElectionService}.
+ */
+public class SingleLeaderElectionServiceTest {
+
+	private static final Random RND = new Random();
+
+	private final Executor executor = Executors.directExecutor();
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testStartStopAssignLeadership() throws Exception {
+		final UUID uuid = UUID.randomUUID();
+		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+
+		final LeaderContender contender = mockContender(service);
+		final LeaderContender otherContender = mockContender(service);
+
+		service.start(contender);
+		verify(contender, times(1)).grantLeadership(uuid);
+
+		service.stop();
+		verify(contender, times(1)).revokeLeadership();
+
+		// start with a new contender - the old contender must not gain another leadership
+		service.start(otherContender);
+		verify(otherContender, times(1)).grantLeadership(uuid);
+
+		verify(contender, times(1)).grantLeadership(uuid);
+		verify(contender, times(1)).revokeLeadership();
+	}
+
+	@Test
+	public void testStopBeforeConfirmingLeadership() throws Exception {
+		final UUID uuid = UUID.randomUUID();
+		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+
+		final LeaderContender contender = mock(LeaderContender.class);
+
+		service.start(contender);
+		verify(contender, times(1)).grantLeadership(uuid);
+
+		service.stop();
+
+		// because the leadership was never confirmed, there is no "revoke" call
+		verifyNoMoreInteractions(contender);
+	}
+
+	@Test
+	public void testStartOnlyOnce() throws Exception {
+		final UUID uuid = UUID.randomUUID();
+		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+
+		final LeaderContender contender = mock(LeaderContender.class);
+		final LeaderContender otherContender = mock(LeaderContender.class);
+
+		service.start(contender);
+		verify(contender, times(1)).grantLeadership(uuid);
+
+		// should not be possible to start again this with another contender
+		try {
+			service.start(otherContender);
+			fail("should fail with an exception");
+		} catch (IllegalStateException e) {
+			// expected
+		}
+
+		// should not be possible to start this again with the same contender
+		try {
+			service.start(contender);
+			fail("should fail with an exception");
+		} catch (IllegalStateException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testShutdown() throws Exception {
+		final UUID uuid = UUID.randomUUID();
+		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+
+		// create a leader contender and let it grab leadership
+		final LeaderContender contender = mockContender(service);
+		service.start(contender);
+		verify(contender, times(1)).grantLeadership(uuid);
+
+		// some leader listeners
+		final LeaderRetrievalListener listener1 = mock(LeaderRetrievalListener.class);
+		final LeaderRetrievalListener listener2 = mock(LeaderRetrievalListener.class);
+
+		LeaderRetrievalService listenerService1 = service.createLeaderRetrievalService();
+		LeaderRetrievalService listenerService2 = service.createLeaderRetrievalService();
+
+		listenerService1.start(listener1);
+		listenerService2.start(listener2);
+
+		// one listener stops
+		listenerService1.stop();
+
+		// shut down the service
+		service.shutdown();
+
+		// the leader contender and running listener should get error notifications
+		verify(contender, times(1)).handleError(any(Exception.class));
+		verify(listener2, times(1)).handleError(any(Exception.class));
+
+		// the stopped listener gets no notification
+		verify(listener1, times(0)).handleError(any(Exception.class));
+
+		// should not be possible to start again after shutdown
+		try {
+			service.start(contender);
+			fail("should fail with an exception");
+		} catch (IllegalStateException e) {
+			// expected
+		}
+
+		// no additional leadership grant
+		verify(contender, times(1)).grantLeadership(any(UUID.class));
+	}
+
+	@Test
+	public void testImmediateShutdown() throws Exception {
+		final UUID uuid = UUID.randomUUID();
+		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+		service.shutdown();
+
+		final LeaderContender contender = mock(LeaderContender.class);
+		
+		// should not be possible to start
+		try {
+			service.start(contender);
+			fail("should fail with an exception");
+		} catch (IllegalStateException e) {
+			// expected
+		}
+
+		// no additional leadership grant
+		verify(contender, times(0)).grantLeadership(any(UUID.class));
+	}
+
+//	@Test
+//	public void testNotifyListenersWhenLeaderElected() throws Exception {
+//		final UUID uuid = UUID.randomUUID();
+//		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+//
+//		final LeaderRetrievalListener listener1 = mock(LeaderRetrievalListener.class);
+//		final LeaderRetrievalListener listener2 = mock(LeaderRetrievalListener.class);
+//
+//		LeaderRetrievalService listenerService1 = service.createLeaderRetrievalService();
+//		LeaderRetrievalService listenerService2 = service.createLeaderRetrievalService();
+//
+//		listenerService1.start(listener1);
+//		listenerService1.start(listener2);
+//
+//		final LeaderContender contender = mockContender(service);
+//		service.start(contender);
+//
+//		veri
+//	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private static LeaderContender mockContender(final LeaderElectionService service) {
+		String address = StringUtils.getRandomString(RND, 5, 10, 'a', 'z');
+		return mockContender(service, address);
+	}
+
+	private static LeaderContender mockContender(final LeaderElectionService service, final String address) {
+		LeaderContender mockContender = mock(LeaderContender.class);
+
+		when(mockContender.getAddress()).thenReturn(address);
+
+		doAnswer(new Answer<Void>() {
+				@Override
+				public Void answer(InvocationOnMock invocation) throws Throwable {
+					final UUID uuid = (UUID) invocation.getArguments()[0];
+					service.confirmLeaderSessionID(uuid);
+					return null;
+				}
+		}).when(mockContender).grantLeadership(any(UUID.class));
+
+		return mockContender;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
index 9ce57d5..7eb220d 100644
--- a/flink-yarn/pom.xml
+++ b/flink-yarn/pom.xml
@@ -32,16 +32,13 @@ under the License.
 	<packaging>jar</packaging>
 
 	<dependencies>
+
+		<!-- core dependencies -->
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-runtime_2.10</artifactId>
 			<version>${project.version}</version>
-			<exclusions>
-				<exclusion>
-					<artifactId>hadoop-core</artifactId>
-					<groupId>org.apache.hadoop</groupId>
-				</exclusion>
-			</exclusions>
 		</dependency>
 
 		<dependency>
@@ -57,46 +54,48 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
 		</dependency>
 
-		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-actor_${scala.binary.version}</artifactId>
-		</dependency>
+		<!-- test dependencies -->
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-remote_${scala.binary.version}</artifactId>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-camel_${scala.binary.version}</artifactId>
+			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+			<scope>test</scope>
 		</dependency>
 
 		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>
 			<scope>test</scope>
+			<type>test-jar</type>
+			<version>${hadoop.version}</version>
 		</dependency>
 
 		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-common</artifactId>
 			<scope>test</scope>
+			<type>test-jar</type>
+			<version>${hadoop.version}</version>
 		</dependency>
 	</dependencies>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 8e3418c..61208c4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -108,7 +108,7 @@ public class YarnApplicationMasterRunner {
 	 * @param args The command line arguments.
 	 */
 	public static void main(String[] args) {
-		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / JobManager", args);
+		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / ResourceManager / JobManager", args);
 		SignalHandler.register(LOG);
 		JvmShutdownSafeguard.installAsShutdownHook(LOG);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index e58c77e..188d9ef 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
-import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -57,7 +56,12 @@ import java.io.FileInputStream;
 import java.io.ObjectInputStream;
 
 /**
- * This class is the executable entry point for the YARN application master.
+ * This class is the executable entry point for the YARN Application Master that
+ * executes a single Flink job and then shuts the YARN application down.
+ * 
+ * <p>The lifetime of the YARN application bound to that of the Flink job. Other
+ * YARN Application Master implementations are for example the YARN session.
+ * 
  * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobManagerRunner}
  * and {@link org.apache.flink.yarn.YarnResourceManager}.
  *
@@ -74,6 +78,8 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 	/** The job graph file path */
 	private static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
 
+	// ------------------------------------------------------------------------
+
 	/** The lock to guard startup / shutdown / manipulation methods */
 	private final Object lock = new Object();
 
@@ -105,7 +111,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 	 * @param args The command line arguments.
 	 */
 	public static void main(String[] args) {
-		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster runner", args);
+		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / ResourceManager / JobManager", args);
 		SignalHandler.register(LOG);
 		JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
@@ -127,7 +133,9 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 					ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
 
 			synchronized (lock) {
+				LOG.info("Starting High Availability Services");
 				haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config);
+				
 				metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 				commonRpcService = createRpcService(config, appMasterHostname, amPortRange);
 
@@ -176,7 +184,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 		return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit()));
 	}
 
-	private ResourceManager createResourceManager(Configuration config) throws ConfigurationException {
+	private ResourceManager<?> createResourceManager(Configuration config) throws Exception {
 		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(config);
 		final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
 		final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(haServices);
@@ -242,7 +250,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 			}
 			if (haServices != null) {
 				try {
-					haServices.shutdown();
+					haServices.close();
 				} catch (Throwable tt) {
 					LOG.warn("Failed to stop the HA service", tt);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index d9912eb..7808152 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -240,7 +240,7 @@ public class YarnTaskExecutorRunner {
 			}
 			if (haServices != null) {
 				try {
-					haServices.shutdown();
+					haServices.close();
 				} catch (Throwable tt) {
 					LOG.warn("Failed to stop the HA service", tt);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
new file mode 100644
index 0000000..c3902d3
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds configuration constants used by Flink's YARN runners.
+ * These options are not expected to be ever configured by users explicitly. 
+ */
+public class YarnConfigOptions {
+
+	/**
+	 * The hostname or address where the application master RPC system is listening.
+	 */
+	public static final ConfigOption<String> APP_MASTER_RPC_ADDRESS =
+			key("yarn.appmaster.rpc.address")
+			.noDefaultValue();
+
+	/**
+	 * The port where the application master RPC system is listening.
+	 */
+	public static final ConfigOption<Integer> APP_MASTER_RPC_PORT =
+			key("yarn.appmaster.rpc.address")
+			.defaultValue(-1);
+
+	// ------------------------------------------------------------------------
+
+	/** This class is not meant to be instantiated */
+	private YarnConfigOptions() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
new file mode 100644
index 0000000..7aa481f
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+
+import java.io.IOException;
+
+/**
+ * Abstract base class for the high availability services for Flink YARN applications that support
+ * no master fail over.
+ *
+ * <p>Internally, these services put their recovery data into YARN's working directory,
+ * except for checkpoints, which are in the configured checkpoint directory. That way,
+ * checkpoints can be resumed with a new job/application, even if the complete YARN application
+ * is killed and cleaned up.
+ */
+public abstract class AbstractYarnNonHaServices extends YarnHighAvailabilityServices {
+
+	/** The constant name of the ResourceManager RPC endpoint */
+	protected static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager";
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates new YARN high-availability services, configuring the file system and recovery
+	 * data directory based on the working directory in the given Hadoop configuration.
+	 *
+	 * <p>This class requires that the default Hadoop file system configured in the given
+	 * Hadoop configuration is an HDFS.
+	 *
+	 * @param config     The Flink configuration of this component / process.
+	 * @param hadoopConf The Hadoop configuration for the YARN cluster.
+	 *
+	 * @throws IOException Thrown, if the initialization of the Hadoop file system used by YARN fails.
+	 */
+	protected AbstractYarnNonHaServices(
+			Configuration config,
+			org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
+		super(config, hadoopConf);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Names
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String getResourceManagerEndpointName() {
+		return RESOURCE_MANAGER_RPC_ENDPOINT_NAME;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Services
+	// ------------------------------------------------------------------------
+
+	@Override
+	public RunningJobsRegistry getRunningJobsRegistry() throws IOException {
+		enter();
+		try {
+			// IMPORTANT: The registry must NOT place its data in a directory that is
+			// cleaned up by these services.
+			return new FsNegativeRunningJobsRegistry(flinkFileSystem, workingDirectory);
+		}
+		finally {
+			exit();
+		}
+	}
+
+	@Override
+	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
+		enter();
+		try {
+			return new StandaloneCheckpointRecoveryFactory();
+		}
+		finally {
+			exit();
+		}
+	}
+
+	@Override
+	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+		throw new UnsupportedOperationException("These High-Availability Services do not support storing job graphs");
+	}
+}