You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:07 UTC

[02/16] flink git commit: [FLINK-6136] Separate EmbeddedHaServices and StandaloneHaServices

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index b22f1a6..05749c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -56,31 +56,20 @@ public class LeaderRetrievalUtils {
 	 * Creates a {@link LeaderRetrievalService} based on the provided {@link Configuration} object.
 	 *
 	 * @param configuration Configuration containing the settings for the {@link LeaderRetrievalService}
-	 * @return The {@link LeaderRetrievalService} specified in the configuration object
-	 * @throws Exception
-	 */
-	public static LeaderRetrievalService createLeaderRetrievalService(Configuration configuration)
-		throws Exception {
-		return createLeaderRetrievalService(configuration, false);
-	}
-
-	/**
-	 * Creates a {@link LeaderRetrievalService} based on the provided {@link Configuration} object.
-	 *
-	 * @param configuration Configuration containing the settings for the {@link LeaderRetrievalService}
 	 * @param resolveInitialHostName If true, resolves the initial hostname
 	 * @return The {@link LeaderRetrievalService} specified in the configuration object
 	 * @throws Exception
 	 */
 	public static LeaderRetrievalService createLeaderRetrievalService(
-			Configuration configuration, boolean resolveInitialHostName)
+			Configuration configuration,
+			boolean resolveInitialHostName)
 		throws Exception {
 
 		HighAvailabilityMode highAvailabilityMode = getRecoveryMode(configuration);
 
 		switch (highAvailabilityMode) {
 			case NONE:
-				return StandaloneUtils.createLeaderRetrievalService(configuration, resolveInitialHostName);
+				return StandaloneUtils.createLeaderRetrievalService(configuration, resolveInitialHostName, null);
 			case ZOOKEEPER:
 				return ZooKeeperUtils.createLeaderRetrievalService(configuration);
 			default:

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
index 8436ced..1719b38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.runtime.util;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution;
+import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.util.NetUtils;
-import scala.Option;
-import scala.Tuple3;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.util.ConfigurationException;
 
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 
 /**
@@ -40,27 +40,15 @@ public final class StandaloneUtils {
 	 *
 	 * @param configuration Configuration instance containing the host and port information
 	 * @return StandaloneLeaderRetrievalService
+	 * @throws ConfigurationException
 	 * @throws UnknownHostException
 	 */
-	public static StandaloneLeaderRetrievalService createLeaderRetrievalService(
-		Configuration configuration)
-		throws UnknownHostException {
-		return createLeaderRetrievalService(configuration, false);
-	}
-
-	/**
-	 * Creates a {@link StandaloneLeaderRetrievalService} from the given configuration. The
-	 * host and port for the remote Akka URL are retrieved from the provided configuration.
-	 *
-	 * @param configuration Configuration instance containing the host and port information
-	 * @param resolveInitialHostName If true, resolves the hostname of the StandaloneLeaderRetrievalService
-	 * @return StandaloneLeaderRetrievalService
-	 * @throws UnknownHostException
-	 */
-	public static StandaloneLeaderRetrievalService createLeaderRetrievalService(
-			Configuration configuration, boolean resolveInitialHostName)
-		throws UnknownHostException {
-		return createLeaderRetrievalService(configuration, resolveInitialHostName, null);
+	public static StandaloneLeaderRetrievalService createLeaderRetrievalService(Configuration configuration)
+		throws ConfigurationException, UnknownHostException {
+		return createLeaderRetrievalService(
+			configuration,
+			false,
+			null);
 	}
 
 	/**
@@ -73,38 +61,22 @@ public final class StandaloneUtils {
 	 * @param resolveInitialHostName If true, resolves the hostname of the StandaloneLeaderRetrievalService
 	 * @param jobManagerName Name of the JobManager actor
 	 * @return StandaloneLeaderRetrievalService
-	 * @throws UnknownHostException if the host name cannot be resolved into an {@link InetAddress}
+	 * @throws ConfigurationException if the job manager address cannot be retrieved from the configuration
+	 * @throws UnknownHostException if the job manager address cannot be resolved
 	 */
 	public static StandaloneLeaderRetrievalService createLeaderRetrievalService(
 			Configuration configuration,
 			boolean resolveInitialHostName,
 			String jobManagerName)
-		throws UnknownHostException {
-
-		Tuple3<String, String, Object> stringIntPair = TaskManager.getAndCheckJobManagerAddress(configuration);
-
-		String protocol = stringIntPair._1();
-		String jobManagerHostname = stringIntPair._2();
-		int jobManagerPort = (Integer) stringIntPair._3();
-
-		// Do not try to resolve a hostname to prevent resolving to the wrong IP address
-		String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(jobManagerHostname, jobManagerPort);
-
-		if (resolveInitialHostName) {
-			try {
-				//noinspection ResultOfMethodCallIgnored
-				InetAddress.getByName(jobManagerHostname);
-			}
-			catch (UnknownHostException e) {
-				throw new UnknownHostException("Cannot resolve the JobManager hostname '" + jobManagerHostname
-					+ "' specified in the configuration");
-			}
-		}
+		throws ConfigurationException, UnknownHostException {
+		Tuple2<String, Integer> hostnamePort = HighAvailabilityServicesUtils.getJobManagerAddress(configuration);
 
-		String jobManagerAkkaUrl = JobManager.getRemoteJobManagerAkkaURL(
-				protocol,
-				hostPort,
-				Option.apply(jobManagerName));
+		String jobManagerAkkaUrl = AkkaRpcServiceUtils.getRpcUrl(
+			hostnamePort.f0,
+			hostnamePort.f1,
+			jobManagerName != null ? jobManagerName : JobMaster.JOB_MANAGER_NAME,
+			resolveInitialHostName ? AddressResolution.TRY_ADDRESS_RESOLUTION : AddressResolution.NO_ADDRESS_RESOLUTION,
+			configuration);
 
 		return new StandaloneLeaderRetrievalService(jobManagerAkkaUrl);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index d94eb7a..62fa73d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -28,7 +28,7 @@ import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration}
 import org.apache.flink.runtime.net.SSLUtils
-import org.apache.flink.util.NetUtils
+import org.apache.flink.util.{ConfigurationException, NetUtils, Preconditions}
 import org.jboss.netty.logging.{InternalLoggerFactory, Slf4JLoggerFactory}
 import org.slf4j.LoggerFactory
 
@@ -707,18 +707,15 @@ object AkkaUtils {
       "(d|day)|(h|hour)|(min|minute)|s|sec|second)|(ms|milli|millisecond)|" +
       "(µs|micro|microsecond)|(ns|nano|nanosecond)"
   }
-  
-  /** Returns the protocol field for the URL of the remote actor system given the user configuration
+
+  /**
+    * Returns the local akka url for the given actor name.
     *
-    * @param config instance containing the user provided configuration values
-    * @return the remote url's protocol field
+    * @param actorName Actor name identifying the actor
+    * @return Local Akka URL for the given actor
     */
-  def getAkkaProtocol(config: Configuration): String = {
-    val sslEnabled = config.getBoolean(ConfigConstants.AKKA_SSL_ENABLED,
-        ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) &&
-      SSLUtils.getSSLEnabled(config)
-    if (sslEnabled) "akka.ssl.tcp" else "akka.tcp"
+  def getLocalAkkaURL(actorName: String): String = {
+    "akka://flink/user/" + actorName
   }
-
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 479ec51..68f43da 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -49,11 +49,15 @@ import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.executiongraph._
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, InstanceManager}
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus}
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway
+import org.apache.flink.runtime.jobmaster.JobMaster
+import org.apache.flink.runtime.jobmaster.JobMaster.{ARCHIVE_NAME, JOB_MANAGER_NAME}
 import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService, StandaloneLeaderElectionService}
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
@@ -72,8 +76,11 @@ import org.apache.flink.runtime.metrics.util.MetricUtils
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, NotifyKvStateRegistered, NotifyKvStateUnregistered}
 import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
+import org.apache.flink.runtime.taskexecutor.TaskExecutor
+import org.apache.flink.runtime.taskexecutor.TaskExecutor.TASK_MANAGER_NAME
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
@@ -1923,12 +1930,6 @@ object JobManager {
   val STARTUP_FAILURE_RETURN_CODE = 1
   val RUNTIME_FAILURE_RETURN_CODE = 2
 
-  /** Name of the JobManager actor */
-  val JOB_MANAGER_NAME = "jobmanager"
-
-  /** Name of the archive actor */
-  val ARCHIVE_NAME = "archive"
-
 
   /**
    * Entry point (main method) to run the JobManager in a standalone fashion.
@@ -2237,7 +2238,7 @@ object JobManager {
       if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
         LOG.info("Starting JobManager web frontend")
         val leaderRetrievalService = LeaderRetrievalUtils
-          .createLeaderRetrievalService(configuration)
+          .createLeaderRetrievalService(configuration, false)
 
         // start the web frontend. we need to load this dynamically
         // because it is not in the same project/dependencies
@@ -2287,7 +2288,7 @@ object JobManager {
           ResourceID.generate(),
           jobManagerSystem,
           externalHostname,
-          Some(TaskManager.TASK_MANAGER_NAME),
+          Some(TaskExecutor.TASK_MANAGER_NAME),
           None,
           localTaskManagerCommunication = true,
           classOf[TaskManager])
@@ -2305,7 +2306,13 @@ object JobManager {
       // start web monitor
       webMonitor.foreach {
         monitor =>
-          val jobManagerAkkaUrl = JobManager.getRemoteJobManagerAkkaURL(configuration)
+          val hostnamePort = HighAvailabilityServicesUtils.getJobManagerAddress(configuration)
+          val jobManagerAkkaUrl = AkkaRpcServiceUtils.getRpcUrl(
+            hostnamePort.f0,
+            hostnamePort.f1,
+            JobMaster.JOB_MANAGER_NAME,
+            AddressResolution.NO_ADDRESS_RESOLUTION,
+            configuration)
           monitor.start(jobManagerAkkaUrl)
       }
 
@@ -2317,7 +2324,7 @@ object JobManager {
               FlinkResourceManager.startResourceManagerActors(
                 configuration,
                 jobManagerSystem,
-                LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
+                LeaderRetrievalUtils.createLeaderRetrievalService(configuration, false),
                 rmClass))
           case None =>
             LOG.info("Resource Manager class not provided. No resource manager will be started.")
@@ -2631,8 +2638,8 @@ object JobManager {
       actorSystem,
       futureExecutor,
       ioExecutor,
-      Some(JOB_MANAGER_NAME),
-      Some(ARCHIVE_NAME),
+      Some(JobMaster.JOB_MANAGER_NAME),
+      Some(JobMaster.ARCHIVE_NAME),
       jobManagerClass,
       archiveClass)
   }
@@ -2761,121 +2768,4 @@ object JobManager {
       jobRecoveryTimeout,
       metricsRegistry)
   }
-
-  // --------------------------------------------------------------------------
-  //  Resolving the JobManager endpoint
-  // --------------------------------------------------------------------------
-
-  /**
-   * Builds the akka actor path for the JobManager actor, given the socket address
-   * where the JobManager's actor system runs.
-   *
-   * @param protocol The protocol to be used to connect to the remote JobManager's actor system.
-   * @param hostPort The external address of the JobManager's actor system in format host:port
-   * @return The akka URL of the JobManager actor.
-   */
-  def getRemoteJobManagerAkkaURL(
-      protocol: String,
-      hostPort: String,
-      name: Option[String] = None)
-    : String = {
-
-    require(protocol == "akka.tcp" || protocol == "akka.ssl.tcp",
-        "protocol field should be either akka.tcp or akka.ssl.tcp")
-
-    getJobManagerAkkaURLHelper(s"$protocol://flink@$hostPort", name)
-  }
-
-  /**
-   * Returns the JobManager actor's remote Akka URL, given the configured hostname and port.
-   *
-   * @param config The configuration to parse
-   * @return JobManager actor remote Akka URL
-   */
-  def getRemoteJobManagerAkkaURL(config: Configuration) : String = {
-    val (protocol, hostname, port) = TaskManager.getAndCheckJobManagerAddress(config)
-
-    val hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port)
-
-    JobManager.getRemoteJobManagerAkkaURL(protocol, hostPort, Option.empty)
-  }
-
-  /**
-   * Builds the akka actor path for the JobManager actor to address the actor within
-   * its own actor system.
-   *
-   * @return The local akka URL of the JobManager actor.
-   */
-  def getLocalJobManagerAkkaURL(name: Option[String] = None): String = {
-    getJobManagerAkkaURLHelper("akka://flink", name)
-  }
-
-  def getJobManagerAkkaURL(system: ActorSystem, name: Option[String] = None): String = {
-    getJobManagerAkkaURLHelper(AkkaUtils.getAddress(system).toString, name)
-  }
-
-  private def getJobManagerAkkaURLHelper(address: String, name: Option[String]): String = {
-    address + "/user/" + name.getOrElse(JOB_MANAGER_NAME)
-  }
-
-  /**
-   * Resolves the JobManager actor reference in a blocking fashion.
-   *
-   * @param jobManagerUrl The akka URL of the JobManager.
-   * @param system The local actor system that should perform the lookup.
-   * @param timeout The maximum time to wait until the lookup fails.
-   * @throws java.io.IOException Thrown, if the lookup fails.
-   * @return The ActorRef to the JobManager
-   */
-  @throws(classOf[IOException])
-  def getJobManagerActorRef(
-      jobManagerUrl: String,
-      system: ActorSystem,
-      timeout: FiniteDuration)
-    : ActorRef = {
-    AkkaUtils.getActorRef(jobManagerUrl, system, timeout)
-  }
-
-  /**
-   * Resolves the JobManager actor reference in a blocking fashion.
-   *
-   * @param protocol The protocol to be used to connect to the remote JobManager's actor system.
-   * @param hostPort The external address of the JobManager's actor system in format host:port.
-   * @param system The local actor system that should perform the lookup.
-   * @param timeout The maximum time to wait until the lookup fails.
-   * @throws java.io.IOException Thrown, if the lookup fails.
-   * @return The ActorRef to the JobManager
-   */
-  @throws(classOf[IOException])
-  def getJobManagerActorRef(
-      protocol: String,
-      hostPort: String,
-      system: ActorSystem,
-      timeout: FiniteDuration)
-    : ActorRef = {
-
-    val jmAddress = getRemoteJobManagerAkkaURL(protocol, hostPort)
-    getJobManagerActorRef(jmAddress, system, timeout)
-  }
-
-  /**
-   * Resolves the JobManager actor reference in a blocking fashion.
-   *
-   * @param hostPort The address of the JobManager's actor system in format host:port.
-   * @param system The local actor system that should perform the lookup.
-   * @param config The config describing the maximum time to wait until the lookup fails.
-   * @throws java.io.IOException Thrown, if the lookup fails.
-   * @return The ActorRef to the JobManager
-   */
-  @throws(classOf[IOException])
-  def getJobManagerActorRef(
-      hostPort: String,
-      system: ActorSystem,
-      config: Configuration)
-    : ActorRef = {
-
-    val timeout = AkkaUtils.getLookupTimeout(config)
-    val protocol = AkkaUtils.getAkkaProtocol(config)
-    getJobManagerActorRef(protocol, hostPort, system, timeout)
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 3d43da5..2f83548 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -38,13 +38,14 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, SubmittedJobGraphStore}
+import org.apache.flink.runtime.jobmaster.JobMaster
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.messages.JobManagerMessages
 import org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, StoppingFailure, StoppingResponse}
 import org.apache.flink.runtime.metrics.MetricRegistry
-import org.apache.flink.runtime.taskexecutor.{TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
+import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 import org.apache.flink.runtime.util.EnvironmentInformation
 
@@ -207,9 +208,9 @@ class LocalFlinkMiniCluster(
     val localExecution = numTaskManagers == 1
 
     val taskManagerActorName = if (singleActorSystem) {
-      TaskManager.TASK_MANAGER_NAME + "_" + (index + 1)
+      TaskExecutor.TASK_MANAGER_NAME + "_" + (index + 1)
     } else {
-      TaskManager.TASK_MANAGER_NAME
+      TaskExecutor.TASK_MANAGER_NAME
     }
 
     val resourceID = ResourceID.generate() // generate random resource id
@@ -397,9 +398,9 @@ class LocalFlinkMiniCluster(
 
   protected def getJobManagerName(index: Int): String = {
     if(singleActorSystem) {
-      JobManager.JOB_MANAGER_NAME + "_" + (index + 1)
+      JobMaster.JOB_MANAGER_NAME + "_" + (index + 1)
     } else {
-      JobManager.JOB_MANAGER_NAME
+      JobMaster.JOB_MANAGER_NAME
     }
   }
 
@@ -413,9 +414,9 @@ class LocalFlinkMiniCluster(
 
   protected def getArchiveName(index: Int): String = {
     if(singleActorSystem) {
-      JobManager.ARCHIVE_NAME + "_" + (index + 1)
+      JobMaster.ARCHIVE_NAME + "_" + (index + 1)
     } else {
-      JobManager.ARCHIVE_NAME
+      JobMaster.ARCHIVE_NAME
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 97e55f0..4065660 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -64,7 +64,7 @@ import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
-import org.apache.flink.runtime.taskexecutor.{TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
+import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
 
@@ -1496,9 +1496,6 @@ object TaskManager {
   /** Return code for critical errors during the runtime */
   val RUNTIME_FAILURE_RETURN_CODE = 2
 
-  /** The name of the TaskManager actor */
-  val TASK_MANAGER_NAME = "taskmanager"
-
   /** Maximum time (milli seconds) that the TaskManager will spend searching for a
     * suitable network interface to use for communication */
   val MAX_STARTUP_CONNECT_TIME = 120000L
@@ -1664,7 +1661,9 @@ object TaskManager {
       LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname)
     }
     else {
-      val leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
+      val leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(
+        configuration,
+        true)
       val lookupTimeout = AkkaUtils.getLookupTimeout(configuration)
 
       val taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(
@@ -1780,7 +1779,7 @@ object TaskManager {
         resourceID,
         taskManagerSystem,
         taskManagerHostname,
-        Some(TASK_MANAGER_NAME),
+        Some(TaskExecutor.TASK_MANAGER_NAME),
         None,
         localTaskManagerCommunication = false,
         taskManagerClass)
@@ -1969,38 +1968,4 @@ object TaskManager {
         throw new IOException("Could not connect to TaskManager at " + taskManagerUrl, e)
     }
   }
-
-  // --------------------------------------------------------------------------
-  //  Parsing and checking the TaskManager Configuration
-  // --------------------------------------------------------------------------
-
-  /**
-   * Gets the protocol, hostname and port of the JobManager from the configuration. Also checks that
-   * the hostname is not null and the port non-negative.
-   *
-   * @param configuration The configuration to read the config values from.
-   * @return A 3-tuple (protocol, hostname, port).
-   */
-  def getAndCheckJobManagerAddress(configuration: Configuration) : (String, String, Int) = {
-
-    val protocol = AkkaUtils.getAkkaProtocol(configuration)
-
-    val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
-
-    val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
-
-    if (hostname == null) {
-      throw new Exception("Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY +
-        "' is missing (hostname/address of JobManager to connect to).")
-    }
-
-    if (port <= 0 || port >= 65536) {
-      throw new Exception("Invalid value for '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
-        "' (port of the JobManager actor system) : " + port +
-        ".  it must be great than 0 and less than 65536.")
-    }
-
-    (protocol, hostname, port)
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 37a86c7..830dbf9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.blob.BlobStore;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.highavailability.nonha.NonHaRegistry;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -147,7 +147,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 
 	@Override
 	public RunningJobsRegistry getRunningJobsRegistry() {
-		return new NonHaRegistry();
+		return new StandaloneRunningJobsRegistry();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
deleted file mode 100644
index b1881e5..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.highavailability;
-
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class ZooKeeperRegistryTest extends TestLogger {
-
-	private TestingServer testingServer;
-
-	@Before
-	public void before() throws Exception {
-		testingServer = new TestingServer();
-	}
-
-	@After
-	public void after() throws Exception {
-		testingServer.stop();
-		testingServer = null;
-	}
-
-	/**
-	 * Tests that the function of ZookeeperRegistry, setJobRunning(), setJobFinished(), isJobRunning()
-	 */
-	@Test
-	public void testZooKeeperRegistry() throws Exception {
-		Configuration configuration = new Configuration();
-		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
-		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-
-		final HighAvailabilityServices zkHaService = new ZookeeperHaServices(
-				ZooKeeperUtils.startCuratorFramework(configuration), Executors.directExecutor(), configuration);
-
-		final RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry();
-
-		try {
-			JobID jobID = JobID.generate();
-			assertEquals(JobSchedulingStatus.PENDING, zkRegistry.getJobSchedulingStatus(jobID));
-
-			zkRegistry.setJobRunning(jobID);
-			assertEquals(JobSchedulingStatus.RUNNING, zkRegistry.getJobSchedulingStatus(jobID));
-
-			zkRegistry.setJobFinished(jobID);
-			assertEquals(JobSchedulingStatus.DONE, zkRegistry.getJobSchedulingStatus(jobID));
-
-			zkRegistry.clearJob(jobID);
-			assertEquals(JobSchedulingStatus.PENDING, zkRegistry.getJobSchedulingStatus(jobID));
-		} finally {
-			zkHaService.close();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java
deleted file mode 100644
index a9805a1..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.highavailability.leaderelection;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.leaderelection.LeaderContender;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.util.StringUtils;
-
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.Executor;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.*;
-
-/**
- * Tests for the {@link SingleLeaderElectionService}.
- */
-public class SingleLeaderElectionServiceTest {
-
-	private static final Random RND = new Random();
-
-	private final Executor executor = Executors.directExecutor();
-
-	// ------------------------------------------------------------------------
-
-	@Test
-	public void testStartStopAssignLeadership() throws Exception {
-		final UUID uuid = UUID.randomUUID();
-		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
-
-		final LeaderContender contender = mockContender(service);
-		final LeaderContender otherContender = mockContender(service);
-
-		service.start(contender);
-		verify(contender, times(1)).grantLeadership(uuid);
-
-		service.stop();
-		verify(contender, times(1)).revokeLeadership();
-
-		// start with a new contender - the old contender must not gain another leadership
-		service.start(otherContender);
-		verify(otherContender, times(1)).grantLeadership(uuid);
-
-		verify(contender, times(1)).grantLeadership(uuid);
-		verify(contender, times(1)).revokeLeadership();
-	}
-
-	@Test
-	public void testStopBeforeConfirmingLeadership() throws Exception {
-		final UUID uuid = UUID.randomUUID();
-		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
-
-		final LeaderContender contender = mock(LeaderContender.class);
-
-		service.start(contender);
-		verify(contender, times(1)).grantLeadership(uuid);
-
-		service.stop();
-
-		// because the leadership was never confirmed, there is no "revoke" call
-		verifyNoMoreInteractions(contender);
-	}
-
-	@Test
-	public void testStartOnlyOnce() throws Exception {
-		final UUID uuid = UUID.randomUUID();
-		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
-
-		final LeaderContender contender = mock(LeaderContender.class);
-		final LeaderContender otherContender = mock(LeaderContender.class);
-
-		service.start(contender);
-		verify(contender, times(1)).grantLeadership(uuid);
-
-		// should not be possible to start again this with another contender
-		try {
-			service.start(otherContender);
-			fail("should fail with an exception");
-		} catch (IllegalStateException e) {
-			// expected
-		}
-
-		// should not be possible to start this again with the same contender
-		try {
-			service.start(contender);
-			fail("should fail with an exception");
-		} catch (IllegalStateException e) {
-			// expected
-		}
-	}
-
-	@Test
-	public void testShutdown() throws Exception {
-		final UUID uuid = UUID.randomUUID();
-		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
-
-		// create a leader contender and let it grab leadership
-		final LeaderContender contender = mockContender(service);
-		service.start(contender);
-		verify(contender, times(1)).grantLeadership(uuid);
-
-		// some leader listeners
-		final LeaderRetrievalListener listener1 = mock(LeaderRetrievalListener.class);
-		final LeaderRetrievalListener listener2 = mock(LeaderRetrievalListener.class);
-
-		LeaderRetrievalService listenerService1 = service.createLeaderRetrievalService();
-		LeaderRetrievalService listenerService2 = service.createLeaderRetrievalService();
-
-		listenerService1.start(listener1);
-		listenerService2.start(listener2);
-
-		// one listener stops
-		listenerService1.stop();
-
-		// shut down the service
-		service.shutdown();
-
-		// the leader contender and running listener should get error notifications
-		verify(contender, times(1)).handleError(any(Exception.class));
-		verify(listener2, times(1)).handleError(any(Exception.class));
-
-		// the stopped listener gets no notification
-		verify(listener1, times(0)).handleError(any(Exception.class));
-
-		// should not be possible to start again after shutdown
-		try {
-			service.start(contender);
-			fail("should fail with an exception");
-		} catch (IllegalStateException e) {
-			// expected
-		}
-
-		// no additional leadership grant
-		verify(contender, times(1)).grantLeadership(any(UUID.class));
-	}
-
-	@Test
-	public void testImmediateShutdown() throws Exception {
-		final UUID uuid = UUID.randomUUID();
-		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
-		service.shutdown();
-
-		final LeaderContender contender = mock(LeaderContender.class);
-		
-		// should not be possible to start
-		try {
-			service.start(contender);
-			fail("should fail with an exception");
-		} catch (IllegalStateException e) {
-			// expected
-		}
-
-		// no additional leadership grant
-		verify(contender, times(0)).grantLeadership(any(UUID.class));
-	}
-
-//	@Test
-//	public void testNotifyListenersWhenLeaderElected() throws Exception {
-//		final UUID uuid = UUID.randomUUID();
-//		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
-//
-//		final LeaderRetrievalListener listener1 = mock(LeaderRetrievalListener.class);
-//		final LeaderRetrievalListener listener2 = mock(LeaderRetrievalListener.class);
-//
-//		LeaderRetrievalService listenerService1 = service.createLeaderRetrievalService();
-//		LeaderRetrievalService listenerService2 = service.createLeaderRetrievalService();
-//
-//		listenerService1.start(listener1);
-//		listenerService1.start(listener2);
-//
-//		final LeaderContender contender = mockContender(service);
-//		service.start(contender);
-//
-//		veri
-//	}
-
-	// ------------------------------------------------------------------------
-	//  utilities
-	// ------------------------------------------------------------------------
-
-	private static LeaderContender mockContender(final LeaderElectionService service) {
-		String address = StringUtils.getRandomString(RND, 5, 10, 'a', 'z');
-		return mockContender(service, address);
-	}
-
-	private static LeaderContender mockContender(final LeaderElectionService service, final String address) {
-		LeaderContender mockContender = mock(LeaderContender.class);
-
-		when(mockContender.getAddress()).thenReturn(address);
-
-		doAnswer(new Answer<Void>() {
-				@Override
-				public Void answer(InvocationOnMock invocation) throws Throwable {
-					final UUID uuid = (UUID) invocation.getArguments()[0];
-					service.confirmLeaderSessionID(uuid);
-					return null;
-				}
-		}).when(mockContender).grantLeadership(any(UUID.class));
-
-		return mockContender;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java
new file mode 100644
index 0000000..7bf9364
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.embedded;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.UUID;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link EmbeddedHaServices}.
+ */
+public class EmbeddedHaServicesTest extends TestLogger {
+
+	private EmbeddedHaServices embeddedHaServices;
+
+	@Before
+	public void setupTest() {
+		embeddedHaServices = new EmbeddedHaServices(Executors.directExecutor());
+	}
+
+	@After
+	public void teardownTest() throws Exception {
+		if (embeddedHaServices != null) {
+			embeddedHaServices.closeAndCleanupAllData();
+			embeddedHaServices = null;
+		}
+	}
+
+	/**
+	 * Tests that exactly one JobManager is elected as the leader for a given job id.
+	 */
+	@Test
+	public void testJobManagerLeaderElection() throws Exception {
+		JobID jobId1 = new JobID();
+		JobID jobId2 = new JobID();
+
+		LeaderContender leaderContender1 = mock(LeaderContender.class);
+		LeaderContender leaderContender2 = mock(LeaderContender.class);
+		LeaderContender leaderContenderDifferentJobId = mock(LeaderContender.class);
+
+		LeaderElectionService leaderElectionService1 = embeddedHaServices.getJobManagerLeaderElectionService(jobId1);
+		LeaderElectionService leaderElectionService2 = embeddedHaServices.getJobManagerLeaderElectionService(jobId1);
+		LeaderElectionService leaderElectionServiceDifferentJobId = embeddedHaServices.getJobManagerLeaderElectionService(jobId2);
+
+		leaderElectionService1.start(leaderContender1);
+		leaderElectionService2.start(leaderContender2);
+		leaderElectionServiceDifferentJobId.start(leaderContenderDifferentJobId);
+
+		ArgumentCaptor<UUID> leaderIdArgumentCaptor1 = ArgumentCaptor.forClass(UUID.class);
+		ArgumentCaptor<UUID> leaderIdArgumentCaptor2 = ArgumentCaptor.forClass(UUID.class);
+		verify(leaderContender1, atLeast(0)).grantLeadership(leaderIdArgumentCaptor1.capture());
+		verify(leaderContender2, atLeast(0)).grantLeadership(leaderIdArgumentCaptor2.capture());
+
+		assertTrue(leaderIdArgumentCaptor1.getAllValues().isEmpty() ^ leaderIdArgumentCaptor2.getAllValues().isEmpty());
+
+		verify(leaderContenderDifferentJobId).grantLeadership(any(UUID.class));
+	}
+
+	/**
+	 * Tests that exactly one ResourceManager is elected as the leader.
+	 */
+	@Test
+	public void testResourceManagerLeaderElection() throws Exception {
+		LeaderContender leaderContender1 = mock(LeaderContender.class);
+		LeaderContender leaderContender2 = mock(LeaderContender.class);
+
+		LeaderElectionService leaderElectionService1 = embeddedHaServices.getResourceManagerLeaderElectionService();
+		LeaderElectionService leaderElectionService2 = embeddedHaServices.getResourceManagerLeaderElectionService();
+
+		leaderElectionService1.start(leaderContender1);
+		leaderElectionService2.start(leaderContender2);
+
+		ArgumentCaptor<UUID> leaderIdArgumentCaptor1 = ArgumentCaptor.forClass(UUID.class);
+		ArgumentCaptor<UUID> leaderIdArgumentCaptor2 = ArgumentCaptor.forClass(UUID.class);
+		verify(leaderContender1, atLeast(0)).grantLeadership(leaderIdArgumentCaptor1.capture());
+		verify(leaderContender2, atLeast(0)).grantLeadership(leaderIdArgumentCaptor2.capture());
+
+		assertTrue(leaderIdArgumentCaptor1.getAllValues().isEmpty() ^ leaderIdArgumentCaptor2.getAllValues().isEmpty());
+	}
+
+	/**
+	 * Tests the JobManager leader retrieval for a given job.
+	 */
+	@Test
+	public void testJobManagerLeaderRetrieval() throws Exception {
+		final String address = "foobar";
+		JobID jobId = new JobID();
+		LeaderRetrievalListener leaderRetrievalListener = mock(LeaderRetrievalListener.class);
+		LeaderContender leaderContender = mock(LeaderContender.class);
+		when(leaderContender.getAddress()).thenReturn(address);
+
+		LeaderElectionService leaderElectionService = embeddedHaServices.getJobManagerLeaderElectionService(jobId);
+		LeaderRetrievalService leaderRetrievalService = embeddedHaServices.getJobManagerLeaderRetriever(jobId);
+
+		leaderRetrievalService.start(leaderRetrievalListener);
+		leaderElectionService.start(leaderContender);
+
+		ArgumentCaptor<UUID> leaderIdArgumentCaptor = ArgumentCaptor.forClass(UUID.class);
+		verify(leaderContender).grantLeadership(leaderIdArgumentCaptor.capture());
+
+		final UUID leaderId = leaderIdArgumentCaptor.getValue();
+
+		leaderElectionService.confirmLeaderSessionID(leaderId);
+
+		verify(leaderRetrievalListener).notifyLeaderAddress(eq(address), eq(leaderId));
+	}
+
+	/**
+	 * Tests the ResourceManager leader retrieval for a given job.
+	 */
+	@Test
+	public void testResourceManagerLeaderRetrieval() throws Exception {
+		final String address = "foobar";
+		LeaderRetrievalListener leaderRetrievalListener = mock(LeaderRetrievalListener.class);
+		LeaderContender leaderContender = mock(LeaderContender.class);
+		when(leaderContender.getAddress()).thenReturn(address);
+
+		LeaderElectionService leaderElectionService = embeddedHaServices.getResourceManagerLeaderElectionService();
+		LeaderRetrievalService leaderRetrievalService = embeddedHaServices.getResourceManagerLeaderRetriever();
+
+		leaderRetrievalService.start(leaderRetrievalListener);
+		leaderElectionService.start(leaderContender);
+
+		ArgumentCaptor<UUID> leaderIdArgumentCaptor = ArgumentCaptor.forClass(UUID.class);
+		verify(leaderContender).grantLeadership(leaderIdArgumentCaptor.capture());
+
+		final UUID leaderId = leaderIdArgumentCaptor.getValue();
+
+		leaderElectionService.confirmLeaderSessionID(leaderId);
+
+		verify(leaderRetrievalListener).notifyLeaderAddress(eq(address), eq(leaderId));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionServiceTest.java
new file mode 100644
index 0000000..3875c4f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionServiceTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.leaderelection;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the {@link SingleLeaderElectionService}.
+ */
+public class SingleLeaderElectionServiceTest {
+
+	private static final Random RND = new Random();
+
+	private final Executor executor = Executors.directExecutor();
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testStartStopAssignLeadership() throws Exception {
+		final UUID uuid = UUID.randomUUID();
+		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+
+		final LeaderContender contender = mockContender(service);
+		final LeaderContender otherContender = mockContender(service);
+
+		service.start(contender);
+		verify(contender, times(1)).grantLeadership(uuid);
+
+		service.stop();
+		verify(contender, times(1)).revokeLeadership();
+
+		// start with a new contender - the old contender must not gain another leadership
+		service.start(otherContender);
+		verify(otherContender, times(1)).grantLeadership(uuid);
+
+		verify(contender, times(1)).grantLeadership(uuid);
+		verify(contender, times(1)).revokeLeadership();
+	}
+
+	@Test
+	public void testStopBeforeConfirmingLeadership() throws Exception {
+		final UUID uuid = UUID.randomUUID();
+		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+
+		final LeaderContender contender = mock(LeaderContender.class);
+
+		service.start(contender);
+		verify(contender, times(1)).grantLeadership(uuid);
+
+		service.stop();
+
+		// because the leadership was never confirmed, there is no "revoke" call
+		verifyNoMoreInteractions(contender);
+	}
+
+	@Test
+	public void testStartOnlyOnce() throws Exception {
+		final UUID uuid = UUID.randomUUID();
+		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+
+		final LeaderContender contender = mock(LeaderContender.class);
+		final LeaderContender otherContender = mock(LeaderContender.class);
+
+		service.start(contender);
+		verify(contender, times(1)).grantLeadership(uuid);
+
+		// should not be possible to start again this with another contender
+		try {
+			service.start(otherContender);
+			fail("should fail with an exception");
+		} catch (IllegalStateException e) {
+			// expected
+		}
+
+		// should not be possible to start this again with the same contender
+		try {
+			service.start(contender);
+			fail("should fail with an exception");
+		} catch (IllegalStateException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testShutdown() throws Exception {
+		final UUID uuid = UUID.randomUUID();
+		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+
+		// create a leader contender and let it grab leadership
+		final LeaderContender contender = mockContender(service);
+		service.start(contender);
+		verify(contender, times(1)).grantLeadership(uuid);
+
+		// some leader listeners
+		final LeaderRetrievalListener listener1 = mock(LeaderRetrievalListener.class);
+		final LeaderRetrievalListener listener2 = mock(LeaderRetrievalListener.class);
+
+		LeaderRetrievalService listenerService1 = service.createLeaderRetrievalService();
+		LeaderRetrievalService listenerService2 = service.createLeaderRetrievalService();
+
+		listenerService1.start(listener1);
+		listenerService2.start(listener2);
+
+		// one listener stops
+		listenerService1.stop();
+
+		// shut down the service
+		service.shutdown();
+
+		// the leader contender and running listener should get error notifications
+		verify(contender, times(1)).handleError(any(Exception.class));
+		verify(listener2, times(1)).handleError(any(Exception.class));
+
+		// the stopped listener gets no notification
+		verify(listener1, times(0)).handleError(any(Exception.class));
+
+		// should not be possible to start again after shutdown
+		try {
+			service.start(contender);
+			fail("should fail with an exception");
+		} catch (IllegalStateException e) {
+			// expected
+		}
+
+		// no additional leadership grant
+		verify(contender, times(1)).grantLeadership(any(UUID.class));
+	}
+
+	@Test
+	public void testImmediateShutdown() throws Exception {
+		final UUID uuid = UUID.randomUUID();
+		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+		service.shutdown();
+
+		final LeaderContender contender = mock(LeaderContender.class);
+		
+		// should not be possible to start
+		try {
+			service.start(contender);
+			fail("should fail with an exception");
+		} catch (IllegalStateException e) {
+			// expected
+		}
+
+		// no additional leadership grant
+		verify(contender, times(0)).grantLeadership(any(UUID.class));
+	}
+
+//	@Test
+//	public void testNotifyListenersWhenLeaderElected() throws Exception {
+//		final UUID uuid = UUID.randomUUID();
+//		final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+//
+//		final LeaderRetrievalListener listener1 = mock(LeaderRetrievalListener.class);
+//		final LeaderRetrievalListener listener2 = mock(LeaderRetrievalListener.class);
+//
+//		LeaderRetrievalService listenerService1 = service.createLeaderRetrievalService();
+//		LeaderRetrievalService listenerService2 = service.createLeaderRetrievalService();
+//
+//		listenerService1.start(listener1);
+//		listenerService1.start(listener2);
+//
+//		final LeaderContender contender = mockContender(service);
+//		service.start(contender);
+//
+//		veri
+//	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private static LeaderContender mockContender(final LeaderElectionService service) {
+		String address = StringUtils.getRandomString(RND, 5, 10, 'a', 'z');
+		return mockContender(service, address);
+	}
+
+	private static LeaderContender mockContender(final LeaderElectionService service, final String address) {
+		LeaderContender mockContender = mock(LeaderContender.class);
+
+		when(mockContender.getAddress()).thenReturn(address);
+
+		doAnswer(new Answer<Void>() {
+				@Override
+				public Void answer(InvocationOnMock invocation) throws Throwable {
+					final UUID uuid = (UUID) invocation.getArguments()[0];
+					service.confirmLeaderSessionID(uuid);
+					return null;
+				}
+		}).when(mockContender).grantLeadership(any(UUID.class));
+
+		return mockContender;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
new file mode 100644
index 0000000..537ed20
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.standalone;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the {@link StandaloneHaServices}.
+ */
+public class StandaloneHaServicesTest extends TestLogger {
+
+	private final String jobManagerAddress = "jobManager";
+	private final String resourceManagerAddress = "resourceManager";
+
+	private StandaloneHaServices standaloneHaServices;
+
+	@Before
+	public void setupTest() {
+
+		standaloneHaServices = new StandaloneHaServices(resourceManagerAddress, jobManagerAddress);
+	}
+
+	@After
+	public void teardownTest() throws Exception {
+		if (standaloneHaServices != null) {
+			standaloneHaServices.closeAndCleanupAllData();
+			standaloneHaServices = null;
+		}
+	}
+
+	/**
+	 * Tests that the standalone leader election services return a fixed address and leader session
+	 * id.
+	 */
+	@Test
+	public void testLeaderElection() throws Exception {
+		JobID jobId = new JobID();
+		LeaderContender jmLeaderContender = mock(LeaderContender.class);
+		LeaderContender rmLeaderContender = mock(LeaderContender.class);
+
+		LeaderElectionService jmLeaderElectionService = standaloneHaServices.getJobManagerLeaderElectionService(jobId);
+		LeaderElectionService rmLeaderElectionService = standaloneHaServices.getResourceManagerLeaderElectionService();
+
+		jmLeaderElectionService.start(jmLeaderContender);
+		rmLeaderElectionService.start(rmLeaderContender);
+
+		verify(jmLeaderContender).grantLeadership(eq(HighAvailabilityServices.DEFAULT_LEADER_ID));
+		verify(rmLeaderContender).grantLeadership(eq(HighAvailabilityServices.DEFAULT_LEADER_ID));
+	}
+
+	/**
+	 * Tests that the standalone leader retrieval services return the specified address and the
+	 * fixed leader session id.
+	 */
+	@Test
+	public void testJobManagerLeaderRetrieval() throws Exception {
+		JobID jobId1 = new JobID();
+		JobID jobId2 = new JobID();
+		LeaderRetrievalListener jmListener1 = mock(LeaderRetrievalListener.class);
+		LeaderRetrievalListener jmListener2 = mock(LeaderRetrievalListener.class);
+		LeaderRetrievalListener rmListener = mock(LeaderRetrievalListener.class);
+
+		LeaderRetrievalService jmLeaderRetrievalService1 = standaloneHaServices.getJobManagerLeaderRetriever(jobId1);
+		LeaderRetrievalService jmLeaderRetrievalService2 = standaloneHaServices.getJobManagerLeaderRetriever(jobId2);
+		LeaderRetrievalService rmLeaderRetrievalService = standaloneHaServices.getResourceManagerLeaderRetriever();
+
+		jmLeaderRetrievalService1.start(jmListener1);
+		jmLeaderRetrievalService2.start(jmListener2);
+		rmLeaderRetrievalService.start(rmListener);
+
+		verify(jmListener1).notifyLeaderAddress(eq(jobManagerAddress), eq(HighAvailabilityServices.DEFAULT_LEADER_ID));
+		verify(jmListener2).notifyLeaderAddress(eq(jobManagerAddress), eq(HighAvailabilityServices.DEFAULT_LEADER_ID));
+		verify(rmListener).notifyLeaderAddress(eq(resourceManagerAddress), eq(HighAvailabilityServices.DEFAULT_LEADER_ID));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
new file mode 100644
index 0000000..06ffe3c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.zookeeper;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
+import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class ZooKeeperRegistryTest extends TestLogger {
+
+	private TestingServer testingServer;
+
+	@Before
+	public void before() throws Exception {
+		testingServer = new TestingServer();
+	}
+
+	@After
+	public void after() throws Exception {
+		testingServer.stop();
+		testingServer = null;
+	}
+
+	/**
+	 * Tests that the function of ZookeeperRegistry, setJobRunning(), setJobFinished(), isJobRunning()
+	 */
+	@Test
+	public void testZooKeeperRegistry() throws Exception {
+		Configuration configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+		final HighAvailabilityServices zkHaService = new ZooKeeperHaServices(
+				ZooKeeperUtils.startCuratorFramework(configuration), Executors.directExecutor(), configuration);
+
+		final RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry();
+
+		try {
+			JobID jobID = JobID.generate();
+			assertEquals(JobSchedulingStatus.PENDING, zkRegistry.getJobSchedulingStatus(jobID));
+
+			zkRegistry.setJobRunning(jobID);
+			assertEquals(JobSchedulingStatus.RUNNING, zkRegistry.getJobSchedulingStatus(jobID));
+
+			zkRegistry.setJobFinished(jobID);
+			assertEquals(JobSchedulingStatus.DONE, zkRegistry.getJobSchedulingStatus(jobID));
+
+			zkRegistry.clearJob(jobID);
+			assertEquals(JobSchedulingStatus.PENDING, zkRegistry.getJobSchedulingStatus(jobID));
+		} finally {
+			zkHaService.close();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index 49c28e6..2ac3ea7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -28,8 +28,10 @@ import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.NetUtils;
 import org.junit.Test;
 
 import org.apache.flink.configuration.Configuration;
@@ -117,10 +119,14 @@ public class JobManagerProcessReapingTest {
 
 			if (jobManagerPort != -1) {
 				try {
-					jobManagerRef = JobManager.getJobManagerActorRef(
-						"akka.tcp",
-						NetUtils.unresolvedHostAndPortToNormalizedString("localhost", jobManagerPort),
-						localSystem, new FiniteDuration(25, TimeUnit.SECONDS));
+					final String jobManagerAkkaUrl = AkkaRpcServiceUtils.getRpcUrl(
+						"localhost",
+						jobManagerPort,
+						JobMaster.JOB_MANAGER_NAME,
+						HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION,
+						AkkaRpcServiceUtils.AkkaProtocol.TCP);
+
+					jobManagerRef = AkkaUtils.getActorRef(jobManagerAkkaUrl, localSystem, new FiniteDuration(25L, TimeUnit.SECONDS));
 				} catch (Throwable t) {
 					// job manager probably not ready yet
 					lastError = t;

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index bdff401..fcca173 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -91,7 +91,7 @@ public class JobSubmitTest {
 			MemoryArchivist.class)._1();
 
 		try {
-			LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(jmConfig);
+			LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(jmConfig, false);
 
 			jmGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
 					lrs,

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 1f1eb62..bc8c0b60 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -140,7 +140,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 
 		Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
 
-		int num = 20;
+		int num = 10;
 
 		ZooKeeperLeaderElectionService[] leaderElectionService = new ZooKeeperLeaderElectionService[num];
 		TestingContender[] contenders = new TestingContender[num];
@@ -194,7 +194,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 				}
 			}
 
-			assertFalse(deadline.isOverdue());
+			assertFalse("Did not complete the leader reelection in time.", deadline.isOverdue());
 			assertEquals(num, numberSeenLeaders);
 
 		} finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index 6a8ff17..b79093f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -22,10 +22,11 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.NetUtils;
@@ -35,7 +36,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import scala.Option;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
@@ -104,10 +104,12 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 			client[0] = ZooKeeperUtils.startCuratorFramework(config);
 			client[1] = ZooKeeperUtils.startCuratorFramework(config);
 
-			String wrongHostPort = NetUtils.unresolvedHostAndPortToNormalizedString("1.1.1.1", 1234);
-
-			String wrongAddress = JobManager.getRemoteJobManagerAkkaURL(AkkaUtils.getAkkaProtocol(config),
-					wrongHostPort, Option.<String>empty());
+			String wrongAddress = AkkaRpcServiceUtils.getRpcUrl(
+				"1.1.1.1",
+				1234,
+				"foobar",
+				HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION,
+				config);
 
 			try {
 				localHost = InetAddress.getLocalHost();
@@ -126,8 +128,12 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 			InetSocketAddress correctInetSocketAddress = new InetSocketAddress(localHost, serverSocket.getLocalPort());
 			String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(localHost.getHostName(), correctInetSocketAddress.getPort());
 
-			String correctAddress = JobManager.getRemoteJobManagerAkkaURL(AkkaUtils.getAkkaProtocol(config),
-				hostPort, Option.<String>empty());
+			String correctAddress = AkkaRpcServiceUtils.getRpcUrl(
+				localHost.getHostName(),
+				correctInetSocketAddress.getPort(),
+				JobMaster.JOB_MANAGER_NAME,
+				HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION,
+				config);
 
 			faultyLeaderElectionService = ZooKeeperUtils.createLeaderElectionService(client[0], config);
 			TestingContender wrongLeaderAddressContender = new TestingContender(wrongAddress, faultyLeaderElectionService);
@@ -192,7 +198,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
 		FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
 
-		LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+		LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config, false);
 		InetAddress result = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout);
 
 		assertEquals(InetAddress.getLocalHost(), result);
@@ -214,7 +220,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 		@Override
 		public void run() {
 			try {
-				LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+				LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config, false);
 				result = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout);
 			} catch (Exception e) {
 				exception = e;

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index d3d4d43..cc79c5d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -44,7 +44,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatListener;
 import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -317,6 +317,7 @@ public class TaskExecutorTest extends TestLogger {
 		final ResourceID resourceID = ResourceID.generate();
 		final String resourceManagerAddress = "/resource/manager/address/one";
 		final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress);
+		final String jobManagerAddress = "localhost";
 
 		final TestingSerialRpcService rpc = new TestingSerialRpcService();
 		try {
@@ -335,7 +336,9 @@ public class TaskExecutorTest extends TestLogger {
 			TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
 			when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
 
-			NonHaServices haServices = new NonHaServices(resourceManagerAddress);
+			StandaloneHaServices haServices = new StandaloneHaServices(
+				resourceManagerAddress,
+				jobManagerAddress);
 
 			final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
 			final SlotReport slotReport = new SlotReport();

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index 2fafe5b..d904004 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.util.NetUtils;
@@ -138,7 +139,7 @@ public abstract class TaskManagerProcessReapingTestBase {
 			// is started and the TaskManager is up
 			String taskManagerActorName = String.format("akka.tcp://flink@%s/user/%s",
 					"localhost:" + taskManagerPort,
-					TaskManager.TASK_MANAGER_NAME());
+					TaskExecutor.TASK_MANAGER_NAME);
 
 			ActorRef taskManagerRef = null;
 			Throwable lastError = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index f3b1d4a..5f35229 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
@@ -182,7 +183,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				// available. we give it the regular JobManager akka URL
 				taskManager = createTaskManager(
 						actorSystem,
-						JobManager.getLocalJobManagerAkkaURL(Option.<String>empty()),
+						AkkaUtils.getLocalAkkaURL(JobMaster.JOB_MANAGER_NAME),
 						new Configuration(),
 						true,
 						false);
@@ -248,7 +249,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				// start the taskManager actor
 				taskManager = createTaskManager(
 						actorSystem,
-						JobManager.getLocalJobManagerAkkaURL(Option.<String>empty()),
+						AkkaUtils.getLocalAkkaURL(JobMaster.JOB_MANAGER_NAME),
 						tmConfig,
 						true,
 						false);

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
index 48c65c0..6e1fb74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
@@ -23,15 +23,17 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.JobManagerMode;
-import org.apache.flink.util.NetUtils;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -143,13 +145,15 @@ public class JobManagerProcess extends TestJvmProcess {
 	/**
 	 * Returns the Akka URL of this JobManager.
 	 */
-	public String getJobManagerAkkaURL(FiniteDuration timeout) throws InterruptedException {
+	public String getJobManagerAkkaURL(FiniteDuration timeout) throws InterruptedException, UnknownHostException {
 		int port = getJobManagerPort(timeout);
 
-		return JobManager.getRemoteJobManagerAkkaURL(
-				AkkaUtils.getAkkaProtocol(config),
-				NetUtils.unresolvedHostAndPortToNormalizedString("localhost", port),
-				Option.<String>empty());
+		return AkkaRpcServiceUtils.getRpcUrl(
+			"localhost",
+			port,
+			JobMaster.JOB_MANAGER_NAME,
+			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION,
+			config);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index 0daac2e..2404fb9 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -20,7 +20,10 @@ package org.apache.flink.runtime.akka
 
 import java.net.InetSocketAddress
 
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
 import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.AkkaProtocol
 import org.apache.flink.util.NetUtils
 import org.junit.runner.RunWith
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
@@ -38,19 +41,21 @@ class AkkaUtilsTest
 
     val address = new InetSocketAddress(host, port)
 
-    val remoteAkkaURL = JobManager.getRemoteJobManagerAkkaURL(
-      "akka.tcp",
-      NetUtils.unresolvedHostAndPortToNormalizedString(host, port),
-      Some("actor"))
+    val remoteAkkaUrl = AkkaRpcServiceUtils.getRpcUrl(
+      host,
+      port,
+      "actor",
+      AddressResolution.NO_ADDRESS_RESOLUTION,
+      AkkaProtocol.TCP)
 
-    val result = AkkaUtils.getInetSockeAddressFromAkkaURL(remoteAkkaURL)
+    val result = AkkaUtils.getInetSockeAddressFromAkkaURL(remoteAkkaUrl)
 
     result should equal(address)
   }
 
   test("getHostFromAkkaURL should throw an exception if the InetSocketAddress cannot be " +
     "retrieved") {
-    val localAkkaURL = JobManager.getLocalJobManagerAkkaURL(Some("actor"))
+    val localAkkaURL = AkkaUtils.getLocalAkkaURL("actor")
 
     intercept[Exception] {
       AkkaUtils.getInetSockeAddressFromAkkaURL(localAkkaURL)

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
index 1489fb2..97a001d 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
@@ -59,7 +59,10 @@ class JobManagerConnectionTest {
       mustReturnWithinTimeout(Duration(5*timeout, TimeUnit.MILLISECONDS)) {
         () => {
           try {
-            JobManager.getJobManagerActorRef(endpoint, actorSystem, config)
+            AkkaUtils.getActorRef(
+              endpoint,
+              actorSystem,
+              AkkaUtils.getLookupTimeout(config))
             fail("Should fail since the JobManager is not reachable")
           }
           catch {
@@ -95,7 +98,10 @@ class JobManagerConnectionTest {
       mustReturnWithinTimeout(Duration(5*timeout, TimeUnit.MILLISECONDS)) {
         () => {
           try {
-            JobManager.getJobManagerActorRef(endpoint, actorSystem, config)
+            AkkaUtils.getActorRef(
+              endpoint,
+              actorSystem,
+              AkkaUtils.getLookupTimeout(config))
             fail("Should fail since the JobManager is not reachable")
           }
           catch {

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 876e26b..d139a3f 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -35,10 +35,12 @@ import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.concurrent.{ScheduledExecutor, ScheduledExecutorServiceAdapter}
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices
+import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
 import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
+import org.apache.flink.runtime.jobmaster.JobMaster
+import org.apache.flink.runtime.jobmaster.JobMaster.{ARCHIVE_NAME, JOB_MANAGER_NAME}
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.taskmanager.TaskManager
@@ -516,8 +518,8 @@ object TestingUtils {
         actorSystem,
         futureExecutor,
         ioExecutor,
-        Some(prefix + JobManager.JOB_MANAGER_NAME),
-        Some(prefix + JobManager.ARCHIVE_NAME),
+        Some(prefix + JobMaster.JOB_MANAGER_NAME),
+        Some(prefix + JobMaster.ARCHIVE_NAME),
         jobManagerClass,
         classOf[MemoryArchivist])
 
@@ -605,7 +607,6 @@ object TestingUtils {
     new AkkaActorGateway(actor, HighAvailabilityServices.DEFAULT_LEADER_ID)
   }
 
-
   class ForwardingActor(val target: ActorRef, val leaderSessionID: Option[UUID])
     extends FlinkActor with LeaderSessionMessageFilter with LogMessages {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index df4f370..5eadba6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -275,7 +275,8 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 			try {
 				LeaderRetrievalService lrService =
 						LeaderRetrievalUtils.createLeaderRetrievalService(
-								cluster.configuration());
+							cluster.configuration(),
+							false);
 
 				JobExecutionResult result = JobClient.submitJobAndWait(
 						clientActorSystem,