You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:09 UTC

[04/16] flink git commit: [FLINK-6136] Separate EmbeddedHaServices and StandaloneHaServices

[FLINK-6136] Separate EmbeddedHaServices and StandaloneHaServices

This PR introduces a standalone high availability services implementation which can be used
in a distributed setting with no HA guarantees. Additionally, it introduces a common base
class which is also used by the EmbeddedHaServices. This base class instantiates the
standalone variants of the checkpoint recovery factory, submitted job graphs store, running
jobs registry and blob store.

The StandaloneHaServices are instantiated with a fixed address for the Job- and
ResourceManager. This address and the HighAvailability.DEFAULT_LEADER_ID is returned by
the corresponding LeaderRetrievalServices when being started.

This closes #3622.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0bb99c7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0bb99c7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0bb99c7

Branch: refs/heads/master
Commit: a0bb99c70703f6eaf0953b9ea6e70c5dbb9c1c77
Parents: 43fa507
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Mar 21 15:10:15 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri May 5 11:05:55 2017 +0200

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     |   8 +-
 .../RemoteExecutorHostnameResolutionTest.java   |   3 +-
 .../apache/flink/client/program/ClientTest.java |  41 +-
 ...rRetrievalServiceHostnameResolutionTest.java |  11 +-
 .../org/apache/flink/storm/api/FlinkClient.java |  16 +-
 .../flink/util/ConfigurationException.java      |  38 ++
 .../MesosApplicationMasterRunner.java           |   7 +-
 .../handlers/HandlerRedirectUtils.java          |   5 +-
 .../apache/flink/runtime/blob/BlobUtils.java    |   4 +-
 .../runtime/client/JobListeningContext.java     |   2 +-
 .../highavailability/EmbeddedNonHaServices.java |  70 ---
 .../HighAvailabilityServicesUtils.java          |  90 +++-
 .../runtime/highavailability/NonHaServices.java |  69 ---
 .../highavailability/ZookeeperHaServices.java   | 215 ---------
 .../highavailability/ZookeeperRegistry.java     | 127 -----
 .../SingleLeaderElectionService.java            | 386 ---------------
 .../nonha/AbstractNonHaServices.java            | 117 ++---
 .../nonha/EmbeddedLeaderService.java            | 469 -------------------
 .../highavailability/nonha/NonHaRegistry.java   |  72 ---
 .../nonha/embedded/EmbeddedHaServices.java      | 126 +++++
 .../nonha/embedded/EmbeddedLeaderService.java   | 468 ++++++++++++++++++
 .../SingleLeaderElectionService.java            | 386 +++++++++++++++
 .../nonha/standalone/StandaloneHaServices.java  | 101 ++++
 .../StandaloneRunningJobsRegistry.java          |  72 +++
 .../zookeeper/ZooKeeperHaServices.java          | 217 +++++++++
 .../zookeeper/ZooKeeperRunningJobsRegistry.java | 128 +++++
 .../flink/runtime/jobmaster/JobMaster.java      |   9 +-
 .../flink/runtime/minicluster/MiniCluster.java  |   4 +-
 .../runtime/query/QueryableStateClient.java     |   2 +-
 .../resourcemanager/ResourceManager.java        |   2 +
 .../ResourceManagerConfiguration.java           |   2 +-
 ...urceManagerRuntimeServicesConfiguration.java |   2 +-
 .../exceptions/ConfigurationException.java      |  38 --
 .../slotmanager/SlotManagerConfiguration.java   |   2 +-
 .../flink/runtime/rpc/RpcServiceUtils.java      | 174 -------
 .../runtime/rpc/akka/AkkaRpcServiceUtils.java   | 205 ++++++++
 .../runtime/taskexecutor/TaskExecutor.java      |   7 +-
 .../runtime/taskexecutor/TaskManagerRunner.java |   4 +-
 .../runtime/util/LeaderRetrievalUtils.java      |  17 +-
 .../flink/runtime/util/StandaloneUtils.java     |  74 +--
 .../apache/flink/runtime/akka/AkkaUtils.scala   |  19 +-
 .../flink/runtime/jobmanager/JobManager.scala   | 148 +-----
 .../minicluster/LocalFlinkMiniCluster.scala     |  15 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  45 +-
 .../TestingHighAvailabilityServices.java        |   4 +-
 .../highavailability/ZooKeeperRegistryTest.java |  82 ----
 .../SingleLeaderElectionServiceTest.java        | 226 ---------
 .../nonha/embedded/EmbeddedHaServicesTest.java  | 167 +++++++
 .../SingleLeaderElectionServiceTest.java        | 226 +++++++++
 .../standalone/StandaloneHaServicesTest.java    | 104 ++++
 .../zookeeper/ZooKeeperRegistryTest.java        |  85 ++++
 .../JobManagerProcessReapingTest.java           |  16 +-
 .../flink/runtime/jobmanager/JobSubmitTest.java |   2 +-
 .../ZooKeeperLeaderElectionTest.java            |   4 +-
 .../ZooKeeperLeaderRetrievalTest.java           |  28 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |   7 +-
 .../TaskManagerProcessReapingTestBase.java      |   3 +-
 .../TaskManagerRegistrationTest.java            |   5 +-
 .../runtime/testutils/JobManagerProcess.java    |  18 +-
 .../flink/runtime/akka/AkkaUtilsTest.scala      |  17 +-
 .../jobmanager/JobManagerConnectionTest.scala   |  10 +-
 .../runtime/testingUtils/TestingUtils.scala     |   9 +-
 .../ZooKeeperLeaderElectionITCase.java          |   3 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |   4 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |   3 +-
 .../apache/flink/yarn/YarnClusterClient.java    |   2 +-
 .../yarn/YarnFlinkApplicationMasterRunner.java  |   7 +-
 .../flink/yarn/YarnTaskExecutorRunner.java      |   9 +-
 .../YarnHighAvailabilityServices.java           |   6 +-
 .../YarnIntraNonHaMasterServices.java           |   2 +-
 .../YarnPreConfiguredMasterNonHaServices.java   |  16 +-
 .../YarnPreConfiguredMasterHaServicesTest.java  |  26 +-
 72 files changed, 2712 insertions(+), 2396 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index ab4daa9..0f88f7c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -241,7 +241,7 @@ public abstract class ClusterClient {
 		try {
 			LeaderConnectionInfo leaderConnectionInfo =
 				LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
-					LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig), timeout);
+					LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true), timeout);
 
 			return AkkaUtils.getInetSockeAddressFromAkkaURL(leaderConnectionInfo.getAddress());
 		} catch (Exception e) {
@@ -464,7 +464,7 @@ public abstract class ClusterClient {
 	public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException {
 		final LeaderRetrievalService leaderRetrievalService;
 		try {
-			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true);
 		} catch (Exception e) {
 			throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e);
 		}
@@ -498,7 +498,7 @@ public abstract class ClusterClient {
 	public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException {
 		final LeaderRetrievalService leaderRetrievalService;
 		try {
-			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true);
 		} catch (Exception e) {
 			throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e);
 		}
@@ -721,7 +721,7 @@ public abstract class ClusterClient {
 	public ActorGateway getJobManagerGateway() throws Exception {
 		LOG.debug("Looking up JobManager");
 		return LeaderRetrievalUtils.retrieveLeaderGateway(
-			LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig),
+			LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true),
 			actorSystemLoader.get(),
 			lookupTimeout);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
index 07edb3a..d8fb3de 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -36,7 +37,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
 
-public class RemoteExecutorHostnameResolutionTest {
+public class RemoteExecutorHostnameResolutionTest extends TestLogger {
 
 	private static final String nonExistingHostname = "foo.bar.com.invalid";
 	private static final int port = 14451;

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 75cb0e7..da297d6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.NetUtils;
@@ -67,7 +67,6 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-
 /**
  * Simple and maybe stupid test to check the {@link ClusterClient} class.
  */
@@ -129,7 +128,9 @@ public class ClientTest extends TestLogger {
 	 */
 	@Test
 	public void testDetachedMode() throws Exception{
-		jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), JobManager.JOB_MANAGER_NAME());
+		jobManagerSystem.actorOf(
+			Props.create(SuccessReturningActor.class),
+			JobMaster.JOB_MANAGER_NAME);
 		ClusterClient out = new StandaloneClusterClient(config);
 		out.setDetached(true);
 
@@ -198,22 +199,18 @@ public class ClientTest extends TestLogger {
 	 * This test verifies correct job submission messaging logic and plan translation calls.
 	 */
 	@Test
-	public void shouldSubmitToJobClient() {
-		try {
-			jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), JobManager.JOB_MANAGER_NAME());
+	public void shouldSubmitToJobClient() throws IOException, ProgramInvocationException {
+		jobManagerSystem.actorOf(
+			Props.create(SuccessReturningActor.class),
+			JobMaster.JOB_MANAGER_NAME);
 
-			ClusterClient out = new StandaloneClusterClient(config);
-			out.setDetached(true);
-			JobSubmissionResult result = out.run(program.getPlanWithJars(), 1);
+		ClusterClient out = new StandaloneClusterClient(config);
+		out.setDetached(true);
+		JobSubmissionResult result = out.run(program.getPlanWithJars(), 1);
 
-			assertNotNull(result);
+		assertNotNull(result);
 
-			program.deleteExtractedLibraries();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		program.deleteExtractedLibraries();
 	}
 
 	/**
@@ -221,7 +218,9 @@ public class ClientTest extends TestLogger {
 	 */
 	@Test
 	public void shouldSubmitToJobClientFails() throws IOException {
-		jobManagerSystem.actorOf(Props.create(FailureReturningActor.class), JobManager.JOB_MANAGER_NAME());
+			jobManagerSystem.actorOf(
+				Props.create(FailureReturningActor.class),
+				JobMaster.JOB_MANAGER_NAME);
 
 		ClusterClient out = new StandaloneClusterClient(config);
 		out.setDetached(true);
@@ -245,7 +244,9 @@ public class ClientTest extends TestLogger {
 	@Test
 	public void tryLocalExecution() {
 		try {
-			jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), JobManager.JOB_MANAGER_NAME());
+			jobManagerSystem.actorOf(
+				Props.create(SuccessReturningActor.class),
+				JobMaster.JOB_MANAGER_NAME);
 			
 			PackagedProgram packagedProgramMock = mock(PackagedProgram.class);
 			when(packagedProgramMock.isUsingInteractiveMode()).thenReturn(true);
@@ -276,7 +277,9 @@ public class ClientTest extends TestLogger {
 	@Test
 	public void testGetExecutionPlan() {
 		try {
-			jobManagerSystem.actorOf(Props.create(FailureReturningActor.class), JobManager.JOB_MANAGER_NAME());
+			jobManagerSystem.actorOf(
+				Props.create(FailureReturningActor.class),
+				JobMaster.JOB_MANAGER_NAME);
 			
 			PackagedProgram prg = new PackagedProgram(TestOptimizerPlan.class, "/dev/random", "/tmp");
 			assertNotNull(prg.getPreviewPlan());

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
index dd7d8bc..fc10f65 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
@@ -56,7 +56,7 @@ public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
 			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
 			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
 
-			LeaderRetrievalUtils.createLeaderRetrievalService(config);
+			LeaderRetrievalUtils.createLeaderRetrievalService(config, false);
 		}
 		catch (Exception e) {
 			System.err.println("Shouldn't throw an exception!");
@@ -69,7 +69,7 @@ public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
 	 * Tests that the StandaloneLeaderRetrievalService does not resolve host names by default.
 	 */
 	@Test
-	public void testUnresolvableHostname2() {
+	public void testUnresolvableHostname2() throws Exception {
 
 		try {
 			Configuration config = new Configuration();
@@ -78,16 +78,11 @@ public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
 			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
 
 			LeaderRetrievalUtils.createLeaderRetrievalService(config, true);
-			fail("This should fail with an IllegalConfigurationException");
+			fail("This should fail with an UnknownHostException");
 		}
 		catch (UnknownHostException e) {
 			// that is what we want!
 		}
-		catch (Exception e) {
-			System.err.println("Wrong exception!");
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
 	}
 
 	private static void checkPreconditions() {

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index c58a8ee..2b7f357 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -22,6 +22,9 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.storm.Config;
 import org.apache.storm.generated.AlreadyAliveException;
 import org.apache.storm.generated.InvalidTopologyException;
@@ -43,12 +46,10 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.storm.util.StormConfig;
 import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Some;
@@ -326,9 +327,14 @@ public class FlinkClient {
 			throw new RuntimeException("Could not start actor system to communicate with JobManager", e);
 		}
 
-		return JobManager.getJobManagerActorRef(AkkaUtils.getAkkaProtocol(configuration),
-				NetUtils.unresolvedHostAndPortToNormalizedString(this.jobManagerHost, this.jobManagerPort),
-				actorSystem, AkkaUtils.getLookupTimeout(configuration));
+		final String jobManagerAkkaUrl = AkkaRpcServiceUtils.getRpcUrl(
+			jobManagerHost,
+			jobManagerPort,
+			JobMaster.JOB_MANAGER_NAME,
+			AddressResolution.TRY_ADDRESS_RESOLUTION,
+			configuration);
+
+		return AkkaUtils.getActorRef(jobManagerAkkaUrl, actorSystem, AkkaUtils.getLookupTimeout(configuration));
 	}
 
 	@SuppressWarnings({ "unchecked", "rawtypes" })

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-core/src/main/java/org/apache/flink/util/ConfigurationException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ConfigurationException.java b/flink-core/src/main/java/org/apache/flink/util/ConfigurationException.java
new file mode 100644
index 0000000..4623fa3
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/ConfigurationException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+/**
+ * Exception which occurs when creating a configuration object fails.
+ */
+public class ConfigurationException extends FlinkException {
+	private static final long serialVersionUID = 3971647332059381556L;
+
+	public ConfigurationException(String message) {
+		super(message);
+	}
+
+	public ConfigurationException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public ConfigurationException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index a23c9f6..0c12745 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
 import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -65,6 +66,8 @@ import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.Option;
+import scala.Some;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -301,8 +304,8 @@ public class MesosApplicationMasterRunner {
 				actorSystem,
 				futureExecutor,
 				ioExecutor,
-				new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
-				scala.Option.<String>empty(),
+				new Some<>(JobMaster.JOB_MANAGER_NAME),
+				Option.<String>empty(),
 				getJobManagerClass(),
 				getArchivistClass())._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
index 6616a2a..144e519 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
@@ -25,12 +25,11 @@ import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.webmonitor.files.MimeTypes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 import scala.Tuple2;
 
 import java.util.regex.Matcher;
@@ -62,7 +61,7 @@ public class HandlerRedirectUtils {
 		final String jobManagerName = localJobManagerAddress.substring(localJobManagerAddress.lastIndexOf("/") + 1);
 
 		if (!localJobManagerAddress.equals(leaderAddress) &&
-			!leaderAddress.equals(JobManager.getLocalJobManagerAkkaURL(Option.apply(jobManagerName)))) {
+			!leaderAddress.equals(AkkaUtils.getLocalAkkaURL(jobManagerName))) {
 			// We are not the leader and need to redirect
 			Matcher matcher = LeaderAddressHostPattern.matcher(leaderAddress);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index c540f74..b0cab78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.highavailability.ZookeeperHaServices;
+import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
@@ -84,7 +84,7 @@ public class BlobUtils {
 		if (highAvailabilityMode == HighAvailabilityMode.NONE) {
 			return new VoidBlobStore();
 		} else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
-			return ZookeeperHaServices.createBlobStore(config);
+			return ZooKeeperHaServices.createBlobStore(config);
 		} else {
 			throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "'.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
index b5d7cb7..b944ba8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
@@ -135,7 +135,7 @@ public final class JobListeningContext {
 	private ActorGateway getJobManager() throws JobRetrievalException {
 		try {
 			return LeaderRetrievalUtils.retrieveLeaderGateway(
-				LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
+				LeaderRetrievalUtils.createLeaderRetrievalService(configuration, true),
 				actorSystem,
 				AkkaUtils.getLookupTimeout(configuration));
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
deleted file mode 100644
index 8bf81eb..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.highavailability;
-
-import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
-import org.apache.flink.runtime.highavailability.nonha.EmbeddedLeaderService;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-
-/**
- * An implementation of the {@link HighAvailabilityServices} for the non-high-availability case
- * where all participants (ResourceManager, JobManagers, TaskManagers) run in the same process.
- *
- * <p>This implementation has no dependencies on any external services. It returns a fix
- * pre-configured ResourceManager, and stores checkpoints and metadata simply on the heap or
- * on a local file system and therefore in a storage without guarantees.
- */
-public class EmbeddedNonHaServices extends AbstractNonHaServices implements HighAvailabilityServices {
-
-	private final EmbeddedLeaderService resourceManagerLeaderService;
-
-	public EmbeddedNonHaServices() {
-		super();
-		this.resourceManagerLeaderService = new EmbeddedLeaderService(getExecutorService());
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
-		return resourceManagerLeaderService.createLeaderRetrievalService();
-	}
-
-	@Override
-	public LeaderElectionService getResourceManagerLeaderElectionService() {
-		return resourceManagerLeaderService.createLeaderElectionService();
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void close() throws Exception {
-		try {
-			super.close();
-		} finally {
-			resourceManagerLeaderService.shutdown();
-		}
-	}
-
-	@Override
-	public void closeAndCleanupAllData() throws Exception {
-		close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index fe180de..106be5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -18,43 +18,111 @@
 
 package org.apache.flink.runtime.highavailability;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
+import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.ConfigurationException;
 
+import java.util.concurrent.Executor;
+
+/**
+ * Utils class to instantiate {@link HighAvailabilityServices} implementations.
+ */
 public class HighAvailabilityServicesUtils {
 
-	public static HighAvailabilityServices createAvailableOrEmbeddedServices(Configuration config) throws Exception {
+	public static HighAvailabilityServices createAvailableOrEmbeddedServices(
+		Configuration config,
+		Executor executor) throws Exception {
 		HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config);
 
 		switch (highAvailabilityMode) {
 			case NONE:
-				return new EmbeddedNonHaServices();
+				return new EmbeddedHaServices(executor);
 
 			case ZOOKEEPER:
-				return new ZookeeperHaServices(ZooKeeperUtils.startCuratorFramework(config), 
-						Executors.directExecutor(), config);
+				return new ZooKeeperHaServices(
+					ZooKeeperUtils.startCuratorFramework(config),
+					executor,
+					config);
 
 			default:
 				throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
 		}
 	}
 	
-	
-	public static HighAvailabilityServices createHighAvailabilityServices(Configuration configuration) throws Exception {
+	public static HighAvailabilityServices createHighAvailabilityServices(
+		Configuration configuration,
+		Executor executor,
+		AddressResolution addressResolution) throws Exception {
+
 		HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
 
 		switch(highAvailabilityMode) {
 			case NONE:
-				final String resourceManagerAddress = null;
-				return new NonHaServices(resourceManagerAddress);
+				final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);
+
+				final String jobManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
+					hostnamePort.f0,
+					hostnamePort.f1,
+					JobMaster.JOB_MANAGER_NAME,
+					addressResolution,
+					configuration);
+				final String resourceManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
+					hostnamePort.f0,
+					hostnamePort.f1,
+					ResourceManager.RESOURCE_MANAGER_NAME,
+					addressResolution,
+					configuration);
+
+				return new StandaloneHaServices(resourceManagerRpcUrl, jobManagerRpcUrl);
 			case ZOOKEEPER:
-				return new ZookeeperHaServices(ZooKeeperUtils.startCuratorFramework(configuration), 
-						Executors.directExecutor(), configuration);
+				return new ZooKeeperHaServices(
+					ZooKeeperUtils.startCuratorFramework(configuration),
+					executor,
+					configuration);
 			default:
 				throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
 		}
 	}
+
+	/**
+	 * Returns the JobManager's hostname and port extracted from the given
+	 * {@link Configuration}.
+	 *
+	 * @param configuration Configuration to extract the JobManager's address from
+	 * @return The JobManager's hostname and port
+	 * @throws ConfigurationException if the JobManager's address cannot be extracted from the configuration
+	 */
+	public static Tuple2<String, Integer> getJobManagerAddress(Configuration configuration) throws ConfigurationException {
+
+		final String hostname = configuration.getString(JobManagerOptions.ADDRESS);
+		final int port = configuration.getInteger(JobManagerOptions.PORT);
+
+		if (hostname == null) {
+			throw new ConfigurationException("Config parameter '" + JobManagerOptions.ADDRESS +
+				"' is missing (hostname/address of JobManager to connect to).");
+		}
+
+		if (port <= 0 || port >= 65536) {
+			throw new ConfigurationException("Invalid value for '" + JobManagerOptions.PORT +
+				"' (port of the JobManager actor system) : " + port +
+				".  it must be great than 0 and less than 65536.");
+		}
+
+		return Tuple2.of(hostname, port);
+	}
+
+	public enum AddressResolution {
+		TRY_ADDRESS_RESOLUTION,
+		NO_ADDRESS_RESOLUTION
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
deleted file mode 100644
index beb5963..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.highavailability;
-
-import org.apache.flink.runtime.highavailability.leaderelection.SingleLeaderElectionService;
-import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * An implementation of the {@link HighAvailabilityServices} for the non-high-availability case.
- * This implementation can be used for testing, and for cluster setups that do not
- * tolerate failures of the master processes (JobManager, ResourceManager).
- * 
- * <p>This implementation has no dependencies on any external services. It returns a fix
- * pre-configured ResourceManager, and stores checkpoints and metadata simply on the heap or
- * on a local file system and therefore in a storage without guarantees.
- */
-public class NonHaServices extends AbstractNonHaServices implements HighAvailabilityServices {
-
-	/** The constant name of the ResourceManager RPC endpoint */
-	private static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager";
-
-	/** The fix address of the ResourceManager */
-	private final String resourceManagerAddress;
-
-	/**
-	 * Creates a new services class for the fix pre-defined leaders.
-	 * 
-	 * @param resourceManagerAddress    The fix address of the ResourceManager
-	 */
-	public NonHaServices(String resourceManagerAddress) {
-		super();
-		this.resourceManagerAddress = checkNotNull(resourceManagerAddress);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Services
-	// ------------------------------------------------------------------------
-
-	@Override
-	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
-		return new StandaloneLeaderRetrievalService(resourceManagerAddress, DEFAULT_LEADER_ID);
-	}
-
-	@Override
-	public LeaderElectionService getResourceManagerLeaderElectionService() {
-		return new SingleLeaderElectionService(getExecutorService(), DEFAULT_LEADER_ID);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
deleted file mode 100644
index 4d0db0a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.highavailability;
-
-import org.apache.curator.framework.CuratorFramework;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.blob.BlobStore;
-import org.apache.flink.runtime.blob.FileSystemBlobStore;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory;
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
-
-import java.io.IOException;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
-
-/**
- * An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper.
- * The services store data in ZooKeeper's nodes as illustrated by teh following tree structure:
- * 
- * <pre>
- * /flink
- *      +/cluster_id_1/resource_manager_lock
- *      |            |
- *      |            +/job-id-1/job_manager_lock
- *      |            |         /checkpoints/latest
- *      |            |                     /latest-1
- *      |            |                     /latest-2
- *      |            |
- *      |            +/job-id-2/job_manager_lock
- *      |      
- *      +/cluster_id_2/resource_manager_lock
- *                   |
- *                   +/job-id-1/job_manager_lock
- *                            |/checkpoints/latest
- *                            |            /latest-1
- *                            |/persisted_job_graph
- * </pre>
- * 
- * <p>The root path "/flink" is configurable via the option {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}.
- * This makes sure Flink stores its data under specific subtrees in ZooKeeper, for example to
- * accommodate specific permission.
- * 
- * <p>The "cluster_id" part identifies the data stored for a specific Flink "cluster". 
- * This "cluster" can be either a standalone or containerized Flink cluster, or it can be job
- * on a framework like YARN or Mesos (in a "per-job-cluster" mode).
- * 
- * <p>In case of a "per-job-cluster" on YARN or Mesos, the cluster-id is generated and configured
- * automatically by the client or dispatcher that submits the Job to YARN or Mesos.
- * 
- * <p>In the case of a standalone cluster, that cluster-id needs to be configured via
- * {@link HighAvailabilityOptions#HA_CLUSTER_ID}. All nodes with the same cluster id will join the same
- * cluster and participate in the execution of the same set of jobs.
- */
-public class ZookeeperHaServices implements HighAvailabilityServices {
-
-	private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock";
-
-	private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";
-
-	// ------------------------------------------------------------------------
-	
-	
-	/** The ZooKeeper client to use */
-	private final CuratorFramework client;
-
-	/** The executor to run ZooKeeper callbacks on */
-	private final Executor executor;
-
-	/** The runtime configuration */
-	private final Configuration configuration;
-
-	/** The zookeeper based running jobs registry */
-	private final RunningJobsRegistry runningJobsRegistry;
-
-	public ZookeeperHaServices(CuratorFramework client, Executor executor, Configuration configuration) {
-		this.client = checkNotNull(client);
-		this.executor = checkNotNull(executor);
-		this.configuration = checkNotNull(configuration);
-		this.runningJobsRegistry = new ZookeeperRegistry(client, configuration);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Services
-	// ------------------------------------------------------------------------
-
-	@Override
-	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
-		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
-	}
-
-	@Override
-	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
-		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
-	}
-
-	@Override
-	public LeaderElectionService getResourceManagerLeaderElectionService() {
-		return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
-	}
-
-	@Override
-	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
-		return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
-	}
-
-	@Override
-	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
-		return new ZooKeeperCheckpointRecoveryFactory(client, configuration, executor);
-	}
-
-	@Override
-	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
-		return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration, executor);
-	}
-
-	@Override
-	public RunningJobsRegistry getRunningJobsRegistry() {
-		return runningJobsRegistry;
-	}
-
-	@Override
-	public BlobStore createBlobStore() throws IOException {
-		return createBlobStore(configuration);
-	}
-
-	/**
-	 * Creates the BLOB store in which BLOBs are stored in a highly-available
-	 * fashion.
-	 *
-	 * @param configuration configuration to extract the storage path from
-	 * @return Blob store
-	 * @throws IOException if the blob store could not be created
-	 */
-	public static BlobStore createBlobStore(
-		final Configuration configuration) throws IOException {
-		String storagePath = configuration.getValue(
-			HighAvailabilityOptions.HA_STORAGE_PATH);
-		if (isNullOrWhitespaceOnly(storagePath)) {
-			throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " +
-					HighAvailabilityOptions.HA_STORAGE_PATH);
-		}
-
-		final Path path;
-		try {
-			path = new Path(storagePath);
-		} catch (Exception e) {
-			throw new IOException("Invalid path for highly available storage (" +
-					HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
-		}
-
-		final FileSystem fileSystem;
-		try {
-			fileSystem = path.getFileSystem();
-		} catch (Exception e) {
-			throw new IOException("Could not create FileSystem for highly available storage (" +
-					HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
-		}
-
-		final String clusterId =
-			configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
-		storagePath += "/" + clusterId;
-
-		return new FileSystemBlobStore(fileSystem, storagePath);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Shutdown
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void close() throws Exception {
-		client.close();
-	}
-
-	@Override
-	public void closeAndCleanupAllData() throws Exception {
-		close();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	private static String getPathForJobManager(final JobID jobID) {
-		return "/" + jobID + JOB_MANAGER_LEADER_PATH;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
deleted file mode 100644
index a8be35a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.highavailability;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.zookeeper.data.Stat;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A zookeeper based registry for running jobs, highly available.
- */
-public class ZookeeperRegistry implements RunningJobsRegistry {
-
-	private static final Charset ENCODING = Charset.forName("utf-8");
-
-	/** The ZooKeeper client to use */
-	private final CuratorFramework client;
-
-	private final String runningJobPath;
-
-	public ZookeeperRegistry(final CuratorFramework client, final Configuration configuration) {
-		this.client = checkNotNull(client, "client");
-		this.runningJobPath = configuration.getString(HighAvailabilityOptions.ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH);
-	}
-
-	@Override
-	public void setJobRunning(JobID jobID) throws IOException {
-		checkNotNull(jobID);
-
-		try {
-			writeEnumToZooKeeper(jobID, JobSchedulingStatus.RUNNING);
-		}
-		catch (Exception e) {
-			throw new IOException("Failed to set RUNNING state in ZooKeeper for job " + jobID, e);
-		}
-	}
-
-	@Override
-	public void setJobFinished(JobID jobID) throws IOException {
-		checkNotNull(jobID);
-
-		try {
-			writeEnumToZooKeeper(jobID, JobSchedulingStatus.DONE);
-		}
-		catch (Exception e) {
-			throw new IOException("Failed to set DONE state in ZooKeeper for job " + jobID, e);
-		}
-	}
-
-	@Override
-	public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException {
-		checkNotNull(jobID);
-
-		try {
-			final String zkPath = createZkPath(jobID);
-			final Stat stat = client.checkExists().forPath(zkPath);
-			if (stat != null) {
-				// found some data, try to parse it
-				final byte[] data = client.getData().forPath(zkPath);
-				if (data != null) {
-					try {
-						final String name = new String(data, ENCODING);
-						return JobSchedulingStatus.valueOf(name);
-					}
-					catch (IllegalArgumentException e) {
-						throw new IOException("Found corrupt data in ZooKeeper: " + 
-								Arrays.toString(data) + " is no valid job status");
-					}
-				}
-			}
-
-			// nothing found, yet, must be in status 'PENDING'
-			return JobSchedulingStatus.PENDING;
-		}
-		catch (Exception e) {
-			throw new IOException("Get finished state from zk fail for job " + jobID.toString(), e);
-		}
-	}
-
-	@Override
-	public void clearJob(JobID jobID) throws IOException {
-		checkNotNull(jobID);
-
-		try {
-			final String zkPath = createZkPath(jobID);
-			this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
-			this.client.delete().forPath(zkPath);
-		}
-		catch (Exception e) {
-			throw new IOException("Failed to clear job state from ZooKeeper for job " + jobID, e);
-		}
-	}
-
-	private String createZkPath(JobID jobID) {
-		return runningJobPath + jobID.toString();
-	}
-
-	private void writeEnumToZooKeeper(JobID jobID, JobSchedulingStatus status) throws Exception {
-		final String zkPath = createZkPath(jobID);
-		this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
-		this.client.setData().forPath(zkPath, status.name().getBytes(ENCODING));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
deleted file mode 100644
index 96e1390..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.highavailability.leaderelection;
-
-import org.apache.flink.runtime.leaderelection.LeaderContender;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-import java.util.HashSet;
-import java.util.UUID;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * An implementation of the {@link LeaderElectionService} interface that handles a single
- * leader contender. When started, this service immediately grants the contender the leadership.
- * 
- * <p>The implementation accepts a single static leader session ID and is hence compatible with
- * pre-configured single leader (no leader failover) setups.
- * 
- * <p>This implementation supports a series of leader listeners that receive notifications about
- * the leader contender.
- */
-public class SingleLeaderElectionService implements LeaderElectionService {
-
-	private static final Logger LOG = LoggerFactory.getLogger(SingleLeaderElectionService.class);
-
-	// ------------------------------------------------------------------------
-
-	/** lock for all operations on this instance */
-	private final Object lock = new Object();
-
-	/** The executor service that dispatches notifications */
-	private final Executor notificationExecutor;
-
-	/** The leader ID assigned to the immediate leader */
-	private final UUID leaderId;
-
-	@GuardedBy("lock")
-	private final HashSet<EmbeddedLeaderRetrievalService> listeners;
-
-	/** The currently proposed leader */
-	@GuardedBy("lock")
-	private volatile LeaderContender proposedLeader;
-
-	/** The confirmed leader */
-	@GuardedBy("lock")
-	private volatile LeaderContender leader;
-
-	/** The address of the confirmed leader */
-	@GuardedBy("lock")
-	private volatile String leaderAddress;
-
-	/** Flag marking this service as shutdown, meaning it cannot be started again */
-	@GuardedBy("lock")
-	private volatile boolean shutdown;
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new leader election service. The service assigns the given leader ID
-	 * to the leader contender.
-	 * 
-	 * @param leaderId The constant leader ID assigned to the leader.
-	 */
-	public SingleLeaderElectionService(Executor notificationsDispatcher, UUID leaderId) {
-		this.notificationExecutor = checkNotNull(notificationsDispatcher);
-		this.leaderId = checkNotNull(leaderId);
-		this.listeners = new HashSet<>();
-
-		shutdown = false;
-	}
-
-	// ------------------------------------------------------------------------
-	//  leader election service
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void start(LeaderContender contender) throws Exception {
-		checkNotNull(contender, "contender");
-
-		synchronized (lock) {
-			checkState(!shutdown, "service is shut down");
-			checkState(proposedLeader == null, "service already started");
-
-			// directly grant leadership to the given contender
-			proposedLeader = contender;
-			notificationExecutor.execute(new GrantLeadershipCall(contender, leaderId));
-		}
-	}
-
-	@Override
-	public void stop() {
-		synchronized (lock) {
-			// notify all listeners that there is no leader
-			for (EmbeddedLeaderRetrievalService listener : listeners) {
-				notificationExecutor.execute(
-						new NotifyOfLeaderCall(null, null, listener.listener, LOG));
-			}
-
-			// if there was a leader, revoke its leadership
-			if (leader != null) {
-				try {
-					leader.revokeLeadership();
-				} catch (Throwable t) {
-					leader.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
-				}
-			}
-
-			proposedLeader = null;
-			leader = null;
-			leaderAddress = null;
-		}
-	}
-
-	@Override
-	public void confirmLeaderSessionID(UUID leaderSessionID) {
-		checkNotNull(leaderSessionID, "leaderSessionID");
-		checkArgument(leaderSessionID.equals(leaderId), "confirmed wrong leader session id");
-
-		synchronized (lock) {
-			checkState(!shutdown, "service is shut down");
-			checkState(proposedLeader != null, "no leader proposed yet");
-			checkState(leader == null, "leader already confirmed");
-
-			// accept the confirmation
-			final String address = proposedLeader.getAddress();
-			leaderAddress = address;
-			leader = proposedLeader;
-
-			// notify all listeners
-			for (EmbeddedLeaderRetrievalService listener : listeners) {
-				notificationExecutor.execute(
-						new NotifyOfLeaderCall(address, leaderId, listener.listener, LOG));
-			}
-		}
-	}
-
-	@Override
-	public boolean hasLeadership() {
-		synchronized (lock) {
-			return leader != null;
-		}
-	}
-
-	void errorOnGrantLeadership(LeaderContender contender, Throwable error) {
-		LOG.warn("Error notifying leader listener about new leader", error);
-		contender.handleError(error instanceof Exception ? (Exception) error : new Exception(error));
-		
-		synchronized (lock) {
-			if (proposedLeader == contender) {
-				proposedLeader = null;
-				leader = null;
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  shutdown
-	// ------------------------------------------------------------------------
-
-	public boolean isShutdown() {
-		return shutdown;
-	}
-
-	public void shutdown() {
-		shutdownInternally(new Exception("The leader service is shutting down"));
-	}
-
-	private void shutdownInternally(Exception exceptionForHandlers) {
-		synchronized (lock) {
-			if (shutdown) {
-				return;
-			}
-
-			shutdown = true;
-
-			// fail the leader (if there is one)
-			if (leader != null) {
-				try {
-					leader.handleError(exceptionForHandlers);
-				} catch (Throwable ignored) {}
-			}
-
-			// clear all leader status
-			leader = null;
-			proposedLeader = null;
-			leaderAddress = null;
-
-			// fail all registered listeners
-			for (EmbeddedLeaderRetrievalService service : listeners) {
-				service.shutdown(exceptionForHandlers);
-			}
-			listeners.clear();
-		}
-	}
-
-	private void fatalError(Throwable error) {
-		LOG.error("Embedded leader election service encountered a fatal error. Shutting down service.", error);
-
-		shutdownInternally(new Exception("Leader election service is shutting down after a fatal error", error));
-	}
-
-	// ------------------------------------------------------------------------
-	//  leader listeners
-	// ------------------------------------------------------------------------
-
-	public LeaderRetrievalService createLeaderRetrievalService() {
-		checkState(!shutdown, "leader election service is shut down");
-		return new EmbeddedLeaderRetrievalService();
-	}
-
-	void addListener(EmbeddedLeaderRetrievalService service, LeaderRetrievalListener listener) {
-		synchronized (lock) {
-			checkState(!shutdown, "leader election service is shut down");
-			checkState(!service.running, "leader retrieval service is already started");
-
-			try {
-				if (!listeners.add(service)) {
-					throw new IllegalStateException("leader retrieval service was added to this service multiple times");
-				}
-
-				service.listener = listener;
-				service.running = true;
-
-				// if we already have a leader, immediately notify this new listener
-				if (leader != null) {
-					notificationExecutor.execute(
-							new NotifyOfLeaderCall(leaderAddress, leaderId, listener, LOG));
-				}
-			}
-			catch (Throwable t) {
-				fatalError(t);
-			}
-		}
-	}
-
-	void removeListener(EmbeddedLeaderRetrievalService service) {
-		synchronized (lock) {
-			// if the service was not even started, simply do nothing
-			if (!service.running || shutdown) {
-				return;
-			}
-
-			try {
-				if (!listeners.remove(service)) {
-					throw new IllegalStateException("leader retrieval service does not belong to this service");
-				}
-
-				// stop the service
-				service.listener = null;
-				service.running = false;
-			}
-			catch (Throwable t) {
-				fatalError(t);
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	private class EmbeddedLeaderRetrievalService implements LeaderRetrievalService {
-
-		volatile LeaderRetrievalListener listener;
-
-		volatile boolean running;
-
-		@Override
-		public void start(LeaderRetrievalListener listener) throws Exception {
-			checkNotNull(listener);
-			addListener(this, listener);
-		}
-
-		@Override
-		public void stop() throws Exception {
-			removeListener(this);
-		}
-
-		void shutdown(Exception cause) {
-			if (running) {
-				final LeaderRetrievalListener lst = listener;
-				running = false;
-				listener = null;
-
-				try {
-					lst.handleError(cause);
-				} catch (Throwable ignored) {}
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  asynchronous notifications
-	// ------------------------------------------------------------------------
-
-	/**
-	 * This runnable informs a leader contender that it gained leadership.
-	 */
-	private class GrantLeadershipCall implements Runnable {
-
-		private final LeaderContender contender;
-		private final UUID leaderSessionId;
-
-		GrantLeadershipCall(LeaderContender contender, UUID leaderSessionId) {
-
-			this.contender = checkNotNull(contender);
-			this.leaderSessionId = checkNotNull(leaderSessionId);
-		}
-
-		@Override
-		public void run() {
-			try {
-				contender.grantLeadership(leaderSessionId);
-			}
-			catch (Throwable t) {
-				errorOnGrantLeadership(contender, t);
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * This runnable informs a leader listener of a new leader
-	 */
-	private static class NotifyOfLeaderCall implements Runnable {
-
-		@Nullable
-		private final String address;       // null if leader revoked without new leader
-		@Nullable
-		private final UUID leaderSessionId; // null if leader revoked without new leader
-
-		private final LeaderRetrievalListener listener;
-		private final Logger logger;
-
-		NotifyOfLeaderCall(
-				@Nullable String address,
-				@Nullable UUID leaderSessionId,
-				LeaderRetrievalListener listener,
-				Logger logger) {
-
-			this.address = address;
-			this.leaderSessionId = leaderSessionId;
-			this.listener = checkNotNull(listener);
-			this.logger = checkNotNull(logger);
-		}
-
-		@Override
-		public void run() {
-			try {
-				listener.notifyLeaderAddress(address, leaderSessionId);
-			}
-			catch (Throwable t) {
-				logger.warn("Error notifying leader listener about new leader", t);
-				listener.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
-			}
-		}
-	}
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
index b10e414..ac90e3f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -18,131 +18,85 @@
 
 package org.apache.flink.runtime.highavailability.nonha;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.blob.BlobStore;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
-import org.apache.flink.runtime.highavailability.ServicesThreadFactory;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
 import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
 import javax.annotation.concurrent.GuardedBy;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * Base for all {@link HighAvailabilityServices} that are not highly available, but are backed
- * by storage that has no availability guarantees and leader election services that cannot
- * elect among multiple distributed leader contenders.
+ * Abstract base class for non high-availability services.
+ *
+ * This class returns the standalone variants for the checkpoint recovery factory, the submitted
+ * job graph store, the running jobs registry and the blob store.
  */
 public abstract class AbstractNonHaServices implements HighAvailabilityServices {
+	protected final Object lock = new Object();
 
-	private final Object lock = new Object();
-
-	private final ExecutorService executor;
-
-	private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices;
-
-	private final NonHaRegistry runningJobsRegistry;
+	private final RunningJobsRegistry runningJobsRegistry;
 
 	private boolean shutdown;
 
-	// ------------------------------------------------------------------------
-
 	public AbstractNonHaServices() {
-		this.executor = Executors.newCachedThreadPool(new ServicesThreadFactory());
-		this.jobManagerLeaderServices = new HashMap<>();
-		this.runningJobsRegistry = new NonHaRegistry();
+		this.runningJobsRegistry = new StandaloneRunningJobsRegistry();
+
+		shutdown = false;
 	}
 
-	// ------------------------------------------------------------------------
-	//  services
-	// ------------------------------------------------------------------------
+	// ----------------------------------------------------------------------
+	// HighAvailabilityServices method implementations
+	// ----------------------------------------------------------------------
 
 	@Override
-	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
-		checkNotNull(jobID);
-
+	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
 		synchronized (lock) {
 			checkNotShutdown();
-			EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
-			return service.createLeaderRetrievalService();
+
+			return new StandaloneCheckpointRecoveryFactory();
 		}
 	}
 
 	@Override
-	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
-		checkNotNull(jobID);
-
+	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
 		synchronized (lock) {
 			checkNotShutdown();
-			EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
-			return service.createLeaderElectionService();
-		}
-	}
 
-	@GuardedBy("lock")
-	private EmbeddedLeaderService getOrCreateJobManagerService(JobID jobID) {
-		EmbeddedLeaderService service = jobManagerLeaderServices.get(jobID);
-		if (service == null) {
-			service = new EmbeddedLeaderService(executor);
-			jobManagerLeaderServices.put(jobID, service);
+			return new StandaloneSubmittedJobGraphStore();
 		}
-		return service;
-	}
-
-	@Override
-	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
-		checkNotShutdown();
-		return new StandaloneCheckpointRecoveryFactory();
 	}
 
 	@Override
-	public SubmittedJobGraphStore getSubmittedJobGraphStore() {
-		checkNotShutdown();
-		return new StandaloneSubmittedJobGraphStore();
-	}
+	public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+		synchronized (lock) {
+			checkNotShutdown();
 
-	@Override
-	public RunningJobsRegistry getRunningJobsRegistry() {
-		checkNotShutdown();
-		return runningJobsRegistry;
+			return runningJobsRegistry;
+		}
 	}
 
 	@Override
 	public BlobStore createBlobStore() throws IOException {
-		checkNotShutdown();
-		return new VoidBlobStore();
-	}
+		synchronized (lock) {
+			checkNotShutdown();
 
-	// ------------------------------------------------------------------------
-	//  shutdown
-	// ------------------------------------------------------------------------
+			return new VoidBlobStore();
+		}
+	}
 
 	@Override
 	public void close() throws Exception {
 		synchronized (lock) {
 			if (!shutdown) {
 				shutdown = true;
-
-				// no further calls should be dispatched
-				executor.shutdownNow();
-
-				// stop all job manager leader services
-				for (EmbeddedLeaderService service : jobManagerLeaderServices.values()) {
-					service.shutdown();
-				}
-				jobManagerLeaderServices.clear();
 			}
 		}
 	}
@@ -153,15 +107,16 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 		close();
 	}
 
-	private void checkNotShutdown() {
+	// ----------------------------------------------------------------------
+	// Helper methods
+	// ----------------------------------------------------------------------
+
+	@GuardedBy("lock")
+	protected void checkNotShutdown() {
 		checkState(!shutdown, "high availability services are shut down");
 	}
 
-	// ------------------------------------------------------------------------
-	//  utilities
-	// ------------------------------------------------------------------------
-
-	protected ExecutorService getExecutorService() {
-		return executor;
+	protected boolean isShutDown() {
+		return shutdown;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
deleted file mode 100644
index d4eba26..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
+++ /dev/null
@@ -1,469 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.highavailability.nonha;
-
-import org.apache.flink.runtime.leaderelection.LeaderContender;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * A simple leader election service, which selects a leader among contenders and notifies listeners.
- * 
- * <p>An election service for contenders can be created via {@link #createLeaderElectionService()},
- * a listener service for leader observers can be created via {@link #createLeaderRetrievalService()}.
- */
-public class EmbeddedLeaderService {
-
-	private static final Logger LOG = LoggerFactory.getLogger(EmbeddedLeaderService.class);
-
-	private final Object lock = new Object();
-
-	private final Executor notificationExecutor;
-
-	private final Set<EmbeddedLeaderElectionService> allLeaderContenders;
-
-	private final Set<EmbeddedLeaderRetrievalService> listeners;
-
-	/** proposed leader, which has been notified of leadership grant, but has not confirmed */
-	private EmbeddedLeaderElectionService currentLeaderProposed;
-
-	/** actual leader that has confirmed leadership and of which listeners have been notified */
-	private EmbeddedLeaderElectionService currentLeaderConfirmed;
-
-	/** fencing UID for the current leader (or proposed leader) */
-	private UUID currentLeaderSessionId;
-
-	/** the cached address of the current leader */
-	private String currentLeaderAddress;
-
-	/** flag marking the service as terminated */
-	private boolean shutdown;
-
-	// ------------------------------------------------------------------------
-
-	public EmbeddedLeaderService(ExecutorService notificationsDispatcher) {
-		this.notificationExecutor = checkNotNull(notificationsDispatcher);
-		this.allLeaderContenders = new HashSet<>();
-		this.listeners = new HashSet<>();
-	}
-
-	// ------------------------------------------------------------------------
-	//  shutdown and errors
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Shuts down this leader election service.
-	 * 
-	 * <p>This method does not perform a clean revocation of the leader status and
-	 * no notification to any leader listeners. It simply notifies all contenders
-	 * and listeners that the service is no longer available.
-	 */
-	public void shutdown() {
-		synchronized (lock) {
-			shutdownInternally(new Exception("Leader election service is shutting down"));
-		}
-	}
-
-	private void fatalError(Throwable error) {
-		LOG.error("Embedded leader election service encountered a fatal error. Shutting down service.", error);
-
-		synchronized (lock) {
-			shutdownInternally(new Exception("Leader election service is shutting down after a fatal error", error));
-		}
-	}
-
-	@GuardedBy("lock")
-	private void shutdownInternally(Exception exceptionForHandlers) {
-		assert Thread.holdsLock(lock);
-
-		if (!shutdown) {
-			// clear all leader status
-			currentLeaderProposed = null;
-			currentLeaderConfirmed = null;
-			currentLeaderSessionId = null;
-			currentLeaderAddress = null;
-
-			// fail all registered listeners
-			for (EmbeddedLeaderElectionService service : allLeaderContenders) {
-				service.shutdown(exceptionForHandlers);
-			}
-			allLeaderContenders.clear();
-
-			// fail all registered listeners
-			for (EmbeddedLeaderRetrievalService service : listeners) {
-				service.shutdown(exceptionForHandlers);
-			}
-			listeners.clear();
-
-			shutdown = true;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  creating contenders and listeners
-	// ------------------------------------------------------------------------
-
-	public LeaderElectionService createLeaderElectionService() {
-		checkState(!shutdown, "leader election service is shut down");
-		return new EmbeddedLeaderElectionService();
-	}
-
-	public LeaderRetrievalService createLeaderRetrievalService() {
-		checkState(!shutdown, "leader election service is shut down");
-		return new EmbeddedLeaderRetrievalService();
-	}
-
-	// ------------------------------------------------------------------------
-	//  adding and removing contenders & listeners
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Callback from leader contenders when they start their service.
-	 */
-	void addContender(EmbeddedLeaderElectionService service, LeaderContender contender) {
-		synchronized (lock) {
-			checkState(!shutdown, "leader election service is shut down");
-			checkState(!service.running, "leader election service is already started");
-
-			try {
-				if (!allLeaderContenders.add(service)) {
-					throw new IllegalStateException("leader election service was added to this service multiple times");
-				}
-
-				service.contender = contender;
-				service.running = true;
-
-				updateLeader();
-			}
-			catch (Throwable t) {
-				fatalError(t);
-			}
-		}
-	}
-
-	/**
-	 * Callback from leader contenders when they stop their service.
-	 */
-	void removeContender(EmbeddedLeaderElectionService service) {
-		synchronized (lock) {
-			// if the service was not even started, simply do nothing
-			if (!service.running || shutdown) {
-				return;
-			}
-
-			try {
-				if (!allLeaderContenders.remove(service)) {
-					throw new IllegalStateException("leader election service does not belong to this service");
-				}
-
-				// stop the service
-				service.contender = null;
-				service.running = false;
-				service.isLeader = false;
-
-				// if that was the current leader, unset its status
-				if (currentLeaderConfirmed == service) {
-					currentLeaderConfirmed = null;
-					currentLeaderSessionId = null;
-					currentLeaderAddress = null;
-				}
-				if (currentLeaderProposed == service) {
-					currentLeaderProposed = null;
-					currentLeaderSessionId = null;
-				}
-
-				updateLeader();
-			}
-			catch (Throwable t) {
-				fatalError(t);
-			}
-		}
-	}
-
-	/**
-	 * Callback from leader contenders when they confirm a leader grant
-	 */
-	void confirmLeader(final EmbeddedLeaderElectionService service, final UUID leaderSessionId) {
-		synchronized (lock) {
-			// if the service was shut down in the meantime, ignore this confirmation
-			if (!service.running || shutdown) {
-				return;
-			}
-
-			try {
-				// check if the confirmation is for the same grant, or whether it is a stale grant 
-				if (service == currentLeaderProposed && currentLeaderSessionId.equals(leaderSessionId)) {
-					final String address = service.contender.getAddress();
-					LOG.info("Received confirmation of leadership for leader {} , session={}", address, leaderSessionId);
-
-					// mark leadership
-					currentLeaderConfirmed = service;
-					currentLeaderAddress = address;
-					currentLeaderProposed = null;
-					service.isLeader = true;
-
-					// notify all listeners
-					for (EmbeddedLeaderRetrievalService listener : listeners) {
-						notificationExecutor.execute(
-								new NotifyOfLeaderCall(address, leaderSessionId, listener.listener, LOG));
-					}
-				}
-				else {
-					LOG.debug("Received confirmation of leadership for a stale leadership grant. Ignoring.");
-				}
-			}
-			catch (Throwable t) {
-				fatalError(t);
-			}
-		}
-	}
-
-	@GuardedBy("lock")
-	private void updateLeader() {
-		// this must be called under the lock
-		assert Thread.holdsLock(lock);
-
-		if (currentLeaderConfirmed == null && currentLeaderProposed == null) {
-			// we need a new leader
-			if (allLeaderContenders.isEmpty()) {
-				// no new leader available, tell everyone that there is no leader currently
-				for (EmbeddedLeaderRetrievalService listener : listeners) {
-					notificationExecutor.execute(
-							new NotifyOfLeaderCall(null, null, listener.listener, LOG));
-				}
-			}
-			else {
-				// propose a leader and ask it
-				final UUID leaderSessionId = UUID.randomUUID();
-				EmbeddedLeaderElectionService leaderService = allLeaderContenders.iterator().next();
-
-				currentLeaderSessionId = leaderSessionId;
-				currentLeaderProposed = leaderService;
-
-				LOG.info("Proposing leadership to contender {} @ {}",
-						leaderService.contender, leaderService.contender.getAddress());
-
-				notificationExecutor.execute(
-						new GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG));
-			}
-		}
-	}
-
-	void addListener(EmbeddedLeaderRetrievalService service, LeaderRetrievalListener listener) {
-		synchronized (lock) {
-			checkState(!shutdown, "leader election service is shut down");
-			checkState(!service.running, "leader retrieval service is already started");
-
-			try {
-				if (!listeners.add(service)) {
-					throw new IllegalStateException("leader retrieval service was added to this service multiple times");
-				}
-
-				service.listener = listener;
-				service.running = true;
-
-				// if we already have a leader, immediately notify this new listener
-				if (currentLeaderConfirmed != null) {
-					notificationExecutor.execute(
-							new NotifyOfLeaderCall(currentLeaderAddress, currentLeaderSessionId, listener, LOG));
-				}
-			}
-			catch (Throwable t) {
-				fatalError(t);
-			}
-		}
-	}
-
-	void removeListener(EmbeddedLeaderRetrievalService service) {
-		synchronized (lock) {
-			// if the service was not even started, simply do nothing
-			if (!service.running || shutdown) {
-				return;
-			}
-
-			try {
-				if (!listeners.remove(service)) {
-					throw new IllegalStateException("leader retrieval service does not belong to this service");
-				}
-
-				// stop the service
-				service.listener = null;
-				service.running = false;
-			}
-			catch (Throwable t) {
-				fatalError(t);
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  election and retrieval service implementations 
-	// ------------------------------------------------------------------------
-
-	private class EmbeddedLeaderElectionService implements LeaderElectionService {
-
-		volatile LeaderContender contender;
-
-		volatile boolean isLeader;
-
-		volatile boolean running;
-
-		@Override
-		public void start(LeaderContender contender) throws Exception {
-			checkNotNull(contender);
-			addContender(this, contender);
-		}
-
-		@Override
-		public void stop() throws Exception {
-			removeContender(this);
-		}
-
-		@Override
-		public void confirmLeaderSessionID(UUID leaderSessionID) {
-			checkNotNull(leaderSessionID);
-			confirmLeader(this, leaderSessionID);
-		}
-
-		@Override
-		public boolean hasLeadership() {
-			return isLeader;
-		}
-
-		void shutdown(Exception cause) {
-			if (running) {
-				running = false;
-				isLeader = false;
-				contender.handleError(cause);
-				contender = null;
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	private class EmbeddedLeaderRetrievalService implements LeaderRetrievalService {
-
-		volatile LeaderRetrievalListener listener;
-
-		volatile boolean running;
-
-		@Override
-		public void start(LeaderRetrievalListener listener) throws Exception {
-			checkNotNull(listener);
-			addListener(this, listener);
-		}
-
-		@Override
-		public void stop() throws Exception {
-			removeListener(this);
-		}
-
-		public void shutdown(Exception cause) {
-			if (running) {
-				running = false;
-				listener.handleError(cause);
-				listener = null;
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  asynchronous notifications
-	// ------------------------------------------------------------------------
-
-	private static class NotifyOfLeaderCall implements Runnable {
-
-		@Nullable
-		private final String address;       // null if leader revoked without new leader
-		@Nullable
-		private final UUID leaderSessionId; // null if leader revoked without new leader
-
-		private final LeaderRetrievalListener listener;
-		private final Logger logger;
-
-		NotifyOfLeaderCall(
-				@Nullable String address,
-				@Nullable UUID leaderSessionId,
-				LeaderRetrievalListener listener,
-				Logger logger) {
-
-			this.address = address;
-			this.leaderSessionId = leaderSessionId;
-			this.listener = checkNotNull(listener);
-			this.logger = checkNotNull(logger);
-		}
-
-		@Override
-		public void run() {
-			try {
-				listener.notifyLeaderAddress(address, leaderSessionId);
-			}
-			catch (Throwable t) {
-				logger.warn("Error notifying leader listener about new leader", t);
-				listener.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static class GrantLeadershipCall implements Runnable {
-
-		private final LeaderContender contender;
-		private final UUID leaderSessionId;
-		private final Logger logger;
-
-		GrantLeadershipCall(
-				LeaderContender contender,
-				UUID leaderSessionId,
-				Logger logger) {
-
-			this.contender = checkNotNull(contender);
-			this.leaderSessionId = checkNotNull(leaderSessionId);
-			this.logger = checkNotNull(logger);
-		}
-
-		@Override
-		public void run() {
-			try {
-				contender.grantLeadership(leaderSessionId);
-			}
-			catch (Throwable t) {
-				logger.warn("Error granting leadership to contender", t);
-				contender.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
-			}
-		}
-	}
-}