You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:15 UTC

[10/16] flink git commit: [FLINK-6078] Remove CuratorFramework#close calls from ZooKeeper based HA services

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-libraries/flink-python/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/resources/log4j-test.properties b/flink-libraries/flink-python/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..18b51cc
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/resources/log4j-test.properties
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger=OFF, console
+
+# -----------------------------------------------------------------------------
+# Console (use 'console')
+# -----------------------------------------------------------------------------
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# -----------------------------------------------------------------------------
+# File (use 'file')
+# -----------------------------------------------------------------------------
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.file=${log.dir}/${mvn.forkNumber}.log
+log4j.appender.file.append=false
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-libraries/flink-python/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/resources/logback-test.xml b/flink-libraries/flink-python/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..1d64d46
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/resources/logback-test.xml
@@ -0,0 +1,42 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] [%X{sourceThread} - %X{akkaSource}] %-5level %logger{60} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+
+    <!-- The following loggers are disabled during tests, because many tests deliberately
+         throw error to test failing scenarios. Logging those would overflow the log. -->
+         <!---->
+    <logger name="org.apache.flink.runtime.operators.DataSinkTask" level="OFF"/>
+    <logger name="org.apache.flink.runtime.operators.BatchTask" level="OFF"/>
+    <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
+    <logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/>
+    <logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>
+    <logger name="org.apache.flink.runtime.testingUtils" level="OFF"/>
+    <logger name="org.apache.flink.runtime.executiongraph.ExecutionGraph" level="OFF"/>
+    <logger name="org.apache.flink.runtime.jobmanager.EventCollector" level="OFF"/>
+    <logger name="org.apache.flink.runtime.instance.InstanceManager" level="OFF"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 0c12745..5513df4 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -48,17 +48,17 @@ import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay;
 import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
 import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
 import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.jobmaster.JobMaster;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.mesos.Protos;
@@ -67,7 +67,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import scala.Option;
-import scala.Some;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -206,6 +205,7 @@ public class MesosApplicationMasterRunner {
 		ScheduledExecutorService futureExecutor = null;
 		ExecutorService ioExecutor = null;
 		MesosServices mesosServices = null;
+		HighAvailabilityServices highAvailabilityServices = null;
 
 		try {
 			// ------- (1) load and parse / validate all configurations -------
@@ -295,6 +295,12 @@ public class MesosApplicationMasterRunner {
 			// 3) Resource Master for Mesos
 			// 4) Process reapers for the JobManager and Resource Master
 
+			// 0: Start the JobManager services
+			highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+				config,
+				ioExecutor,
+				HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+
 			// 1: the JobManager
 			LOG.debug("Starting JobManager actor");
 
@@ -304,8 +310,9 @@ public class MesosApplicationMasterRunner {
 				actorSystem,
 				futureExecutor,
 				ioExecutor,
-				new Some<>(JobMaster.JOB_MANAGER_NAME),
-				Option.<String>empty(),
+				highAvailabilityServices,
+				Option.apply(JobMaster.JOB_MANAGER_NAME),
+				Option.apply(JobMaster.ARCHIVE_NAME),
 				getJobManagerClass(),
 				getArchivistClass())._1();
 
@@ -313,7 +320,12 @@ public class MesosApplicationMasterRunner {
 			// 2: the web monitor
 			LOG.debug("Starting Web Frontend");
 
-			webMonitor = BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager, LOG);
+			webMonitor = BootstrapTools.startWebMonitorIfConfigured(
+				config,
+				highAvailabilityServices,
+				actorSystem,
+				jobManager,
+				LOG);
 			if(webMonitor != null) {
 				final URL webMonitorURL = new URL("http", appMasterHostname, webMonitor.getServerPort(), "/");
 				mesosConfig.frameworkInfo().setWebuiUrl(webMonitorURL.toExternalForm());
@@ -327,17 +339,12 @@ public class MesosApplicationMasterRunner {
 				config,
 				ioExecutor);
 
-			// we need the leader retrieval service here to be informed of new
-			// leader session IDs, even though there can be only one leader ever
-			LeaderRetrievalService leaderRetriever =
-				LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager);
-
 			Props resourceMasterProps = MesosFlinkResourceManager.createActorProps(
 				getResourceManagerClass(),
 				config,
 				mesosConfig,
 				workerStore,
-				leaderRetriever,
+				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
 				taskManagerParameters,
 				taskManagerContainerSpec,
 				artifactServer,
@@ -435,6 +442,14 @@ public class MesosApplicationMasterRunner {
 			LOG.error("Failed to stop the artifact server", t);
 		}
 
+		if (highAvailabilityServices != null) {
+			try {
+				highAvailabilityServices.close();
+			} catch (Throwable t) {
+				LOG.error("Could not properly stop the high availability services.");
+			}
+		}
+
 		org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(
 			AkkaUtils.getTimeout(config).toMillis(),
 			TimeUnit.MILLISECONDS,

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index 7915e8a..7fe5db5 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -43,11 +43,12 @@ import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.clusterframework.messages.*;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.mesos.SchedulerDriver;
 import org.apache.mesos.Protos;
@@ -142,11 +143,10 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 	/**
 	 * The context fixture.
 	 */
-	static class Context extends JavaTestKit {
+	static class Context extends JavaTestKit implements AutoCloseable {
 
 		// mocks
 		public ActorGateway jobManager;
-		public LeaderRetrievalService retrievalService;
 		public MesosConfiguration mesosConfig;
 		public MesosWorkerStore workerStore;
 		public MesosArtifactResolver artifactResolver;
@@ -163,6 +163,8 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 		public Protos.TaskID task2 = Protos.TaskID.newBuilder().setValue("taskmanager-00002").build();
 		public Protos.TaskID task3 = Protos.TaskID.newBuilder().setValue("taskmanager-00003").build();
 
+		private final TestingHighAvailabilityServices highAvailabilityServices;
+
 		/**
 		 * Create mock RM dependencies.
 		 */
@@ -170,8 +172,19 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 			super(system);
 
 			try {
-				jobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-				retrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager.actor());
+				jobManager = TestingUtils.createForwardingActor(
+					system,
+					getTestActor(),
+					HighAvailabilityServices.DEFAULT_LEADER_ID,
+					Option.<String>empty());
+
+				highAvailabilityServices = new TestingHighAvailabilityServices();
+
+				highAvailabilityServices.setJobMasterLeaderRetriever(
+					HighAvailabilityServices.DEFAULT_JOB_ID,
+					new TestingLeaderRetrievalService(
+						jobManager.path(),
+						HighAvailabilityServices.DEFAULT_LEADER_ID));
 
 				// scheduler driver
 				schedulerDriver = mock(SchedulerDriver.class);
@@ -206,7 +219,14 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 			TestActorRef<TestingMesosFlinkResourceManager> resourceManagerRef =
 				TestActorRef.create(system, MesosFlinkResourceManager.createActorProps(
 					TestingMesosFlinkResourceManager.class,
-					config, mesosConfig, workerStore, retrievalService, tmParams, containerSpecification, artifactResolver, LOG));
+					config,
+					mesosConfig,
+					workerStore,
+					highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+					tmParams,
+					containerSpecification,
+					artifactResolver,
+					LOG));
 			resourceManagerInstance = resourceManagerRef.underlyingActor();
 			resourceManager = new AkkaActorGateway(resourceManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
 
@@ -235,6 +255,11 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 				.setLaunch(Protos.Offer.Operation.Launch.newBuilder().addAllTaskInfos(Arrays.asList(taskInfo))
 				).build();
 		}
+
+		@Override
+		public void close() throws Exception {
+			highAvailabilityServices.closeAndCleanupAllData();
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index e80c509..f31c932 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -103,6 +104,12 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 
 			jobGraph.addVertex(task);
 
+			final Configuration config = new Configuration();
+
+			final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+				config,
+				TestingUtils.defaultExecutor());
+
 			ActorGateway jobManger = null;
 			ActorGateway taskManager = null;
 
@@ -125,13 +132,17 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 					testActorSystem,
 					TestingUtils.defaultExecutor(),
 					TestingUtils.defaultExecutor(),
-					new Configuration());
+					config,
+					highAvailabilityServices);
 
-				final Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
 
 				taskManager = TestingUtils.createTaskManager(
-						testActorSystem, jobManger, config, true, true);
+					testActorSystem,
+					highAvailabilityServices,
+					config,
+					true,
+					true);
 
 				final ActorGateway jm = jobManger;
 
@@ -264,6 +275,8 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 				TestingUtils.stopActor(jobManger);
 				TestingUtils.stopActor(taskManager);
 
+				highAvailabilityServices.closeAndCleanupAllData();
+
 				for (Buffer buf : buffers) {
 					buf.recycle();
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index 5463384..b67e735 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -89,6 +90,12 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
 
 			jobGraph.addVertex(task);
 
+			final Configuration config = new Configuration();
+
+			final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+				config,
+				TestingUtils.defaultExecutor());
+
 			ActorGateway jobManger = null;
 			ActorGateway taskManager = null;
 
@@ -97,13 +104,17 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
 					testActorSystem,
 					TestingUtils.defaultExecutor(),
 					TestingUtils.defaultExecutor(),
-					new Configuration());
+					config,
+					highAvailabilityServices);
 
-				final Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
 
 				taskManager = TestingUtils.createTaskManager(
-						testActorSystem, jobManger, config, true, true);
+					testActorSystem,
+					highAvailabilityServices,
+					config,
+					true,
+					true);
 
 				final ActorGateway jm = jobManger;
 
@@ -182,6 +193,8 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
 			} finally {
 				TestingUtils.stopActor(jobManger);
 				TestingUtils.stopActor(taskManager);
+
+				highAvailabilityServices.closeAndCleanupAllData();
 			}
 		}};
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 8af1f46..5ccfe90 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -26,6 +26,8 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderelection.TestingListener;
@@ -33,7 +35,6 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.webmonitor.files.MimeTypes;
 import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
 import org.apache.flink.util.TestLogger;
@@ -49,8 +50,6 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.File;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Scanner;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -127,7 +126,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 
 		ActorSystem[] jobManagerSystem = new ActorSystem[2];
 		WebRuntimeMonitor[] webMonitor = new WebRuntimeMonitor[2];
-		List<LeaderRetrievalService> leaderRetrievalServices = new ArrayList<>();
+		HighAvailabilityServices highAvailabilityServices = null;
 
 		try (TestingServer zooKeeper = new TestingServer()) {
 			final Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
@@ -141,15 +140,20 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 			config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
 			config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
 
+			highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+				config,
+				TestingUtils.defaultExecutor());
+
 			for (int i = 0; i < jobManagerSystem.length; i++) {
 				jobManagerSystem[i] = AkkaUtils.createActorSystem(new Configuration(),
 						new Some<>(new Tuple2<String, Object>("localhost", 0)));
 			}
 
 			for (int i = 0; i < webMonitor.length; i++) {
-				LeaderRetrievalService lrs = ZooKeeperUtils.createLeaderRetrievalService(config);
-				leaderRetrievalServices.add(lrs);
-				webMonitor[i] = new WebRuntimeMonitor(config, lrs, jobManagerSystem[i]);
+				webMonitor[i] = new WebRuntimeMonitor(
+					config,
+					highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+					jobManagerSystem[i]);
 			}
 
 			ActorRef[] jobManager = new ActorRef[2];
@@ -164,6 +168,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 					jobManagerSystem[i],
 					TestingUtils.defaultExecutor(),
 					TestingUtils.defaultExecutor(),
+					highAvailabilityServices,
 					JobManager.class,
 					MemoryArchivist.class)._1();
 
@@ -171,8 +176,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 				webMonitor[i].start(jobManagerAddress[i]);
 			}
 
-			LeaderRetrievalService lrs = ZooKeeperUtils.createLeaderRetrievalService(config);
-			leaderRetrievalServices.add(lrs);
+			LeaderRetrievalService lrs = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
 			TestingListener leaderListener = new TestingListener();
 			lrs.start(leaderListener);
 
@@ -247,6 +251,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 				assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json"));
 				assertTrue(response.getContent().contains("\"taskmanagers\":1") ||
 						response.getContent().contains("\"taskmanagers\":0"));
+			} finally {
+				lrs.stop();
 			}
 		}
 		finally {
@@ -260,8 +266,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 				monitor.stop();
 			}
 
-			for (LeaderRetrievalService lrs : leaderRetrievalServices) {
-				lrs.stop();
+			if (highAvailabilityServices != null) {
+				highAvailabilityServices.closeAndCleanupAllData();
 			}
 		}
 	}
@@ -462,7 +468,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 
 		WebRuntimeMonitor webMonitor = new WebRuntimeMonitor(
 			config,
-			flink.createLeaderRetrievalService(),
+			flink.highAvailabilityServices().getJobManagerLeaderRetriever(
+				HighAvailabilityServices.DEFAULT_JOB_ID),
 			jmActorSystem);
 
 		webMonitor.start(jobManagerAddress);

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index 481559b..d60b26f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -55,7 +55,7 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac
 
 	@Override
 	public void stop() {
-		client.close();
+		// Nothing to do
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 76d6d86..b570383 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -34,9 +34,9 @@ import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.util.SerializedThrowable;
@@ -96,14 +96,14 @@ public class JobClient {
 	public static JobListeningContext submitJob(
 			ActorSystem actorSystem,
 			Configuration config,
-			LeaderRetrievalService leaderRetrievalService,
+			HighAvailabilityServices highAvailabilityServices,
 			JobGraph jobGraph,
 			FiniteDuration timeout,
 			boolean sysoutLogUpdates,
 			ClassLoader classLoader) {
 
 		checkNotNull(actorSystem, "The actorSystem must not be null.");
-		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
+		checkNotNull(highAvailabilityServices, "The high availability services must not be null.");
 		checkNotNull(jobGraph, "The jobGraph must not be null.");
 		checkNotNull(timeout, "The timeout must not be null.");
 
@@ -112,7 +112,7 @@ public class JobClient {
 		// update messages, watches for disconnect between client and JobManager, ...
 
 		Props jobClientActorProps = JobSubmissionClientActor.createActorProps(
-			leaderRetrievalService,
+			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
 			timeout,
 			sysoutLogUpdates,
 			config);
@@ -125,11 +125,12 @@ public class JobClient {
 				new Timeout(AkkaUtils.INF_TIMEOUT()));
 
 		return new JobListeningContext(
-				jobGraph.getJobID(),
-				submissionFuture,
-				jobClientActor,
-				timeout,
-				classLoader);
+			jobGraph.getJobID(),
+			submissionFuture,
+			jobClientActor,
+			timeout,
+			classLoader,
+			highAvailabilityServices);
 	}
 
 
@@ -142,7 +143,7 @@ public class JobClient {
 			ActorGateway jobManagerGateWay,
 			Configuration configuration,
 			ActorSystem actorSystem,
-			LeaderRetrievalService leaderRetrievalService,
+			HighAvailabilityServices highAvailabilityServices,
 			FiniteDuration timeout,
 			boolean sysoutLogUpdates) {
 
@@ -150,14 +151,14 @@ public class JobClient {
 		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
 		checkNotNull(configuration, "The configuration must not be null.");
 		checkNotNull(actorSystem, "The actorSystem must not be null.");
-		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
+		checkNotNull(highAvailabilityServices, "The high availability services must not be null.");
 		checkNotNull(timeout, "The timeout must not be null.");
 
 		// we create a proxy JobClientActor that deals with all communication with
 		// the JobManager. It forwards the job attachments, checks the success/failure responses, logs
 		// update messages, watches for disconnect between client and JobManager, ...
 		Props jobClientActorProps = JobAttachmentClientActor.createActorProps(
-			leaderRetrievalService,
+			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
 			timeout,
 			sysoutLogUpdates);
 
@@ -169,12 +170,13 @@ public class JobClient {
 				new Timeout(AkkaUtils.INF_TIMEOUT()));
 
 		return new JobListeningContext(
-				jobID,
-				attachmentFuture,
-				jobClientActor,
-				timeout,
-				actorSystem,
-				configuration);
+			jobID,
+			attachmentFuture,
+			jobClientActor,
+			timeout,
+			actorSystem,
+			configuration,
+			highAvailabilityServices);
 	}
 
 	/**
@@ -357,20 +359,19 @@ public class JobClient {
 	 *
 	 * @param actorSystem The actor system that performs the communication.
 	 * @param config The cluster wide configuration.
-	 * @param leaderRetrievalService Leader retrieval service which used to find the current leading
-	 *                               JobManager
+	 * @param highAvailabilityServices Service factory for high availability services
 	 * @param jobGraph    JobGraph describing the Flink job
 	 * @param timeout     Timeout for futures
 	 * @param sysoutLogUpdates prints log updates to system out if true
 	 * @param classLoader The class loader for deserializing the results
 	 * @return The job execution result
-	 * @throws org.apache.flink.runtime.client.JobExecutionException Thrown if the job
+	 * @throws JobExecutionException Thrown if the job
 	 *                                                               execution fails.
 	 */
 	public static JobExecutionResult submitJobAndWait(
 			ActorSystem actorSystem,
 			Configuration config,
-			LeaderRetrievalService leaderRetrievalService,
+			HighAvailabilityServices highAvailabilityServices,
 			JobGraph jobGraph,
 			FiniteDuration timeout,
 			boolean sysoutLogUpdates,
@@ -379,7 +380,7 @@ public class JobClient {
 		JobListeningContext jobListeningContext = submitJob(
 				actorSystem,
 				config,
-				leaderRetrievalService,
+				highAvailabilityServices,
 				jobGraph,
 				timeout,
 				sysoutLogUpdates,

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
index a0bf97d..793041f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
@@ -167,7 +167,8 @@ public abstract class JobClientActor extends FlinkUntypedActor implements Leader
 			JobManagerActorRef msg = (JobManagerActorRef) message;
 			connectToJobManager(msg.jobManager());
 
-			logAndPrintMessage("Connected to JobManager at " + msg.jobManager());
+			logAndPrintMessage("Connected to JobManager at " + msg.jobManager() +
+				" with leader session id " + leaderSessionID + '.');
 
 			connectedToJobManager();
 		}
@@ -326,8 +327,6 @@ public abstract class JobClientActor extends FlinkUntypedActor implements Leader
 			getContext().unwatch(jobManager);
 		}
 
-		LOG.info("Connected to new JobManager {}.", jobManager.path());
-
 		this.jobManager = jobManager;
 		getContext().watch(jobManager);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
index b944ba8..fe8c34c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
@@ -22,6 +22,7 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.slf4j.Logger;
@@ -47,6 +48,9 @@ public final class JobListeningContext {
 	/** Timeout used Asks */
 	private final FiniteDuration timeout;
 
+	/** Service factory for high availability services */
+	private final HighAvailabilityServices highAvailabilityServices;
+
 	/** ActorSystem for leader retrieval */
 	private ActorSystem actorSystem;
 	/** Flink configuration for initializing the BlobService */
@@ -59,16 +63,18 @@ public final class JobListeningContext {
 	 * Constructor to use when the class loader is available.
 	 */
 	public JobListeningContext(
-		JobID jobID,
-		Future<Object> jobResultFuture,
-		ActorRef jobClientActor,
-		FiniteDuration timeout,
-		ClassLoader classLoader) {
+			JobID jobID,
+			Future<Object> jobResultFuture,
+			ActorRef jobClientActor,
+			FiniteDuration timeout,
+			ClassLoader classLoader,
+			HighAvailabilityServices highAvailabilityServices) {
 		this.jobID = checkNotNull(jobID);
 		this.jobResultFuture = checkNotNull(jobResultFuture);
 		this.jobClientActor = checkNotNull(jobClientActor);
 		this.timeout = checkNotNull(timeout);
 		this.classLoader = checkNotNull(classLoader);
+		this.highAvailabilityServices = checkNotNull(highAvailabilityServices, "highAvailabilityServices");
 	}
 
 	/**
@@ -80,13 +86,15 @@ public final class JobListeningContext {
 		ActorRef jobClientActor,
 		FiniteDuration timeout,
 		ActorSystem actorSystem,
-		Configuration configuration) {
+		Configuration configuration,
+		HighAvailabilityServices highAvailabilityServices) {
 		this.jobID = checkNotNull(jobID);
 		this.jobResultFuture = checkNotNull(jobResultFuture);
 		this.jobClientActor = checkNotNull(jobClientActor);
 		this.timeout = checkNotNull(timeout);
 		this.actorSystem = checkNotNull(actorSystem);
 		this.configuration = checkNotNull(configuration);
+		this.highAvailabilityServices = checkNotNull(highAvailabilityServices, "highAvailabilityServices");
 	}
 
 	/**
@@ -135,7 +143,7 @@ public final class JobListeningContext {
 	private ActorGateway getJobManager() throws JobRetrievalException {
 		try {
 			return LeaderRetrievalUtils.retrieveLeaderGateway(
-				LeaderRetrievalUtils.createLeaderRetrievalService(configuration, true),
+				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
 				actorSystem,
 				AkkaUtils.getLookupTimeout(configuration));
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index e356d2b..9bcaa18 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -29,8 +29,8 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.util.NetUtils;
@@ -165,17 +165,20 @@ public class BootstrapTools {
 
 	/**
 	 * Starts the web frontend.
+	 *
 	 * @param config The Flink config.
+	 * @param highAvailabilityServices Service factory for high availability services
 	 * @param actorSystem The ActorSystem to start the web frontend in.
 	 * @param logger Logger for log output
 	 * @return WebMonitor instance.
 	 * @throws Exception
 	 */
 	public static WebMonitor startWebMonitorIfConfigured(
-				Configuration config,
-				ActorSystem actorSystem,
-				ActorRef jobManager,
-				Logger logger) throws Exception {
+			Configuration config,
+			HighAvailabilityServices highAvailabilityServices,
+			ActorSystem actorSystem,
+			ActorRef jobManager,
+			Logger logger) throws Exception {
 
 
 		// this ensures correct values are present in the web frontend
@@ -186,8 +189,8 @@ public class BootstrapTools {
 		if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
 			logger.info("Starting JobManager Web Frontend");
 
-			LeaderRetrievalService leaderRetrievalService = 
-				LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager);
+			LeaderRetrievalService leaderRetrievalService =
+				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
 
 			// start the web frontend. we need to load this dynamically
 			// because it is not in the same project/dependencies

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 3de75d5..69a65dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -56,6 +56,13 @@ public interface HighAvailabilityServices extends AutoCloseable {
 	 */
 	UUID DEFAULT_LEADER_ID = new UUID(0, 0);
 
+	/**
+	 * This JobID should be used to identify the old JobManager when using the
+	 * {@link HighAvailabilityServices}. With Flip-6 every JobManager will have a distinct
+	 * JobID assigned.
+	 */
+	JobID DEFAULT_JOB_ID = new JobID(0L, 0L);
+
 	// ------------------------------------------------------------------------
 	//  Services
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 106be5a..c9e2957 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -115,7 +115,7 @@ public class HighAvailabilityServicesUtils {
 		if (port <= 0 || port >= 65536) {
 			throw new ConfigurationException("Invalid value for '" + JobManagerOptions.PORT +
 				"' (port of the JobManager actor system) : " + port +
-				".  it must be great than 0 and less than 65536.");
+				".  it must be greater than 0 and less than 65536.");
 		}
 
 		return Tuple2.of(hostname, port);

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 2a8af37..2f5cd25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -484,6 +484,15 @@ public class JobGraph implements Serializable {
 	}
 
 	/**
+	 * Gets the list of assigned user jar paths.
+	 *
+	 * @return The list of assigned user jar paths
+	 */
+	public List<Path> getUserJars() {
+		return userJars;
+	}
+
+	/**
 	 * Adds the BLOB referenced by the key to the JobGraph's dependencies.
 	 *
 	 * @param key

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index aaafa76..2552088 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -143,11 +143,13 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 			if (isRunning) {
 				jobGraphListener = null;
 
-				pathCache.close();
-
-				client.close();
-
-				isRunning = false;
+				try {
+					pathCache.close();
+				} catch (Exception e) {
+					throw new Exception("Could not properly stop the ZooKeeperSubmittedJobGraphStore.", e);
+				} finally {
+					isRunning = false;
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
index 0fa6a9e..21e2ca1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.ChildData;
@@ -48,6 +49,8 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
 
 	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionService.class);
 
+	private final Object lock = new Object();
+
 	/** Client to the ZooKeeper quorum */
 	private final CuratorFramework client;
 
@@ -62,12 +65,12 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
 
 	private UUID issuedLeaderSessionID;
 
-	private UUID confirmedLeaderSessionID;
+	private volatile UUID confirmedLeaderSessionID;
 
 	/** The leader contender which applies for leadership */
 	private volatile LeaderContender leaderContender;
 
-	private final Object lock = new Object();
+	private volatile boolean running;
 
 	private final ConnectionStateListener listener = new ConnectionStateListener() {
 		@Override
@@ -84,11 +87,17 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
 	 * @param leaderPath ZooKeeper node path for the node which stores the current leader information
 	 */
 	public ZooKeeperLeaderElectionService(CuratorFramework client, String latchPath, String leaderPath) {
-		this.client = client;
-		this.leaderPath = leaderPath;
+		this.client = Preconditions.checkNotNull(client, "CuratorFramework client");
+		this.leaderPath = Preconditions.checkNotNull(leaderPath, "leaderPath");
 
 		leaderLatch = new LeaderLatch(client, latchPath);
 		cache = new NodeCache(client, leaderPath);
+
+		issuedLeaderSessionID = null;
+		confirmedLeaderSessionID = null;
+		leaderContender = null;
+
+		running = false;
 	}
 
 	/**
@@ -105,32 +114,56 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
 		Preconditions.checkNotNull(contender, "Contender must not be null.");
 		Preconditions.checkState(leaderContender == null, "Contender was already set.");
 
-		LOG.info("Starting ZooKeeperLeaderElectionService.");
+		LOG.info("Starting ZooKeeperLeaderElectionService {}.", this);
+
+		synchronized (lock) {
 
-		leaderContender = contender;
+			leaderContender = contender;
 
-		leaderLatch.addListener(this);
-		leaderLatch.start();
+			leaderLatch.addListener(this);
+			leaderLatch.start();
 
-		cache.getListenable().addListener(this);
-		cache.start();
+			cache.getListenable().addListener(this);
+			cache.start();
 
-		client.getConnectionStateListenable().addListener(listener);
+			client.getConnectionStateListenable().addListener(listener);
+
+			running = true;
+		}
 	}
 
 	@Override
 	public void stop() throws Exception{
-		LOG.info("Stopping ZooKeeperLeaderElectionService.");
+		synchronized (lock) {
+			if (!running) {
+				return;
+			}
+
+			running = false;
+			confirmedLeaderSessionID = null;
+			issuedLeaderSessionID = null;
+		}
+
+		LOG.info("Stopping ZooKeeperLeaderElectionService {}.", this);
 
 		client.getConnectionStateListenable().removeListener(listener);
 
-		cache.close();
-		leaderLatch.close();
-		client.close();
+		Exception exception = null;
 
-		synchronized (lock) {
-			confirmedLeaderSessionID = null;
-			issuedLeaderSessionID = null;
+		try {
+			cache.close();
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(e, exception);
+		}
+
+		try {
+			leaderLatch.close();
+		} catch(Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(e, exception);
+		}
+
+		if (exception != null) {
+			throw new Exception("Could not properly stop the ZooKeeperLeaderElectionService.", exception);
 		}
 	}
 
@@ -148,9 +181,14 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
 		if(leaderLatch.hasLeadership()) {
 			// check if this is an old confirmation call
 			synchronized (lock) {
-				if (leaderSessionID.equals(this.issuedLeaderSessionID)) {
-					confirmedLeaderSessionID = leaderSessionID;
-					writeLeaderInformation(confirmedLeaderSessionID);
+				if (running) {
+					if (leaderSessionID.equals(this.issuedLeaderSessionID)) {
+						confirmedLeaderSessionID = leaderSessionID;
+						writeLeaderInformation(confirmedLeaderSessionID);
+					}
+				} else {
+					LOG.debug("Ignoring the leader session Id {} confirmation, since the " +
+						"ZooKeeperLeaderElectionService has already been stopped.", leaderSessionID);
 				}
 			}
 		} else {
@@ -167,31 +205,41 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
 	@Override
 	public void isLeader() {
 		synchronized (lock) {
-			issuedLeaderSessionID = UUID.randomUUID();
-			confirmedLeaderSessionID = null;
+			if (running) {
+				issuedLeaderSessionID = UUID.randomUUID();
+				confirmedLeaderSessionID = null;
+
+				if (LOG.isDebugEnabled()) {
+					LOG.debug(
+						"Grant leadership to contender {} with session ID {}.",
+						leaderContender.getAddress(),
+						issuedLeaderSessionID);
+				}
 
-			if (LOG.isDebugEnabled()) {
-				LOG.debug(
-					"Grant leadership to contender {} with session ID {}.",
-					leaderContender.getAddress(),
-					issuedLeaderSessionID);
+				leaderContender.grantLeadership(issuedLeaderSessionID);
+			} else {
+				LOG.debug("Ignoring the grant leadership notification since the service has " +
+					"already been stopped.");
 			}
-
-			leaderContender.grantLeadership(issuedLeaderSessionID);
 		}
 	}
 
 	@Override
 	public void notLeader() {
 		synchronized (lock) {
-			issuedLeaderSessionID = null;
-			confirmedLeaderSessionID = null;
+			if (running) {
+				issuedLeaderSessionID = null;
+				confirmedLeaderSessionID = null;
 
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Revoke leadership of {}.", leaderContender.getAddress());
-			}
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Revoke leadership of {}.", leaderContender.getAddress());
+				}
 
-			leaderContender.revokeLeadership();
+				leaderContender.revokeLeadership();
+			} else {
+				LOG.debug("Ignoring the revoke leadership notification since the service " +
+					"has already been stopped.");
+			}
 		}
 	}
 
@@ -201,53 +249,57 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
 			// leaderSessionID is null if the leader contender has not yet confirmed the session ID
 			if (leaderLatch.hasLeadership()) {
 				synchronized (lock) {
-					if (LOG.isDebugEnabled()) {
-						LOG.debug(
-							"Leader node changed while {} is the leader with session ID {}.",
-							leaderContender.getAddress(),
-							confirmedLeaderSessionID);
-					}
-
-					if (confirmedLeaderSessionID != null) {
-						ChildData childData = cache.getCurrentData();
+					if (running) {
+						if (LOG.isDebugEnabled()) {
+							LOG.debug(
+								"Leader node changed while {} is the leader with session ID {}.",
+								leaderContender.getAddress(),
+								confirmedLeaderSessionID);
+						}
 
-						if (childData == null) {
-							if (LOG.isDebugEnabled()) {
-								LOG.debug(
-									"Writing leader information into empty node by {}.",
-									leaderContender.getAddress());
-							}
-							writeLeaderInformation(confirmedLeaderSessionID);
-						} else {
-							byte[] data = childData.getData();
+						if (confirmedLeaderSessionID != null) {
+							ChildData childData = cache.getCurrentData();
 
-							if (data == null || data.length == 0) {
-								// the data field seems to be empty, rewrite information
+							if (childData == null) {
 								if (LOG.isDebugEnabled()) {
 									LOG.debug(
-										"Writing leader information into node with empty data field by {}.",
+										"Writing leader information into empty node by {}.",
 										leaderContender.getAddress());
 								}
 								writeLeaderInformation(confirmedLeaderSessionID);
 							} else {
-								ByteArrayInputStream bais = new ByteArrayInputStream(data);
-								ObjectInputStream ois = new ObjectInputStream(bais);
-
-								String leaderAddress = ois.readUTF();
-								UUID leaderSessionID = (UUID) ois.readObject();
+								byte[] data = childData.getData();
 
-								if (!leaderAddress.equals(this.leaderContender.getAddress()) ||
-										(leaderSessionID == null || !leaderSessionID.equals(confirmedLeaderSessionID))) {
-									// the data field does not correspond to the expected leader information
+								if (data == null || data.length == 0) {
+									// the data field seems to be empty, rewrite information
 									if (LOG.isDebugEnabled()) {
 										LOG.debug(
-											"Correcting leader information by {}.",
+											"Writing leader information into node with empty data field by {}.",
 											leaderContender.getAddress());
 									}
 									writeLeaderInformation(confirmedLeaderSessionID);
+								} else {
+									ByteArrayInputStream bais = new ByteArrayInputStream(data);
+									ObjectInputStream ois = new ObjectInputStream(bais);
+
+									String leaderAddress = ois.readUTF();
+									UUID leaderSessionID = (UUID) ois.readObject();
+
+									if (!leaderAddress.equals(this.leaderContender.getAddress()) ||
+										(leaderSessionID == null || !leaderSessionID.equals(confirmedLeaderSessionID))) {
+										// the data field does not correspond to the expected leader information
+										if (LOG.isDebugEnabled()) {
+											LOG.debug(
+												"Correcting leader information by {}.",
+												leaderContender.getAddress());
+										}
+										writeLeaderInformation(confirmedLeaderSessionID);
+									}
 								}
 							}
 						}
+					} else {
+						LOG.debug("Ignoring node change notification since the service has already been stopped.");
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
index f74fb1a..7f18fd9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.Objects;
 import java.util.UUID;
@@ -43,6 +44,8 @@ public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService,
 	private static final Logger LOG = LoggerFactory.getLogger(
 		ZooKeeperLeaderRetrievalService.class);
 
+	private final Object lock = new Object();
+
 	/** Connection to the used ZooKeeper quorum */
 	private final CuratorFramework client;
 
@@ -53,8 +56,11 @@ public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService,
 	private volatile LeaderRetrievalListener leaderListener;
 
 	private String lastLeaderAddress;
+
 	private UUID lastLeaderSessionID;
 
+	private volatile boolean running;
+
 	private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() {
 		@Override
 		public void stateChanged(CuratorFramework client, ConnectionState newState) {
@@ -69,8 +75,14 @@ public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService,
 	 * @param retrievalPath Path of the ZooKeeper node which contains the leader information
 	 */
 	public ZooKeeperLeaderRetrievalService(CuratorFramework client, String retrievalPath) {
-		this.client = client;
+		this.client = Preconditions.checkNotNull(client, "CuratorFramework client");
 		this.cache = new NodeCache(client, retrievalPath);
+
+		this.leaderListener = null;
+		this.lastLeaderAddress = null;
+		this.lastLeaderSessionID = null;
+
+		running = false;
 	}
 
 	@Override
@@ -81,65 +93,87 @@ public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService,
 
 		LOG.info("Starting ZooKeeperLeaderRetrievalService.");
 
-		leaderListener = listener;
+		synchronized (lock) {
+			leaderListener = listener;
+
+			cache.getListenable().addListener(this);
+			cache.start();
 
-		cache.getListenable().addListener(this);
-		cache.start();
+			client.getConnectionStateListenable().addListener(connectionStateListener);
 
-		client.getConnectionStateListenable().addListener(connectionStateListener);
+			running = true;
+		}
 	}
 
 	@Override
 	public void stop() throws Exception {
 		LOG.info("Stopping ZooKeeperLeaderRetrievalService.");
 
+		synchronized (lock) {
+			if (!running) {
+				return;
+			}
+
+			running = false;
+		}
+
 		client.getConnectionStateListenable().removeListener(connectionStateListener);
 
-		cache.close();
+		try {
+			cache.close();
+		} catch (IOException e) {
+			throw new Exception("Could not properly stop the ZooKeeperLeaderRetrievalService.", e);
+		}
 	}
 
 	@Override
 	public void nodeChanged() throws Exception {
-		try {
-			LOG.debug("Leader node has changed.");
-
-			ChildData childData = cache.getCurrentData();
-
-			String leaderAddress;
-			UUID leaderSessionID;
-
-			if (childData == null) {
-				leaderAddress = null;
-				leaderSessionID = null;
-			} else {
-				byte[] data = childData.getData();
-
-				if (data == null || data.length == 0) {
-					leaderAddress = null;
-					leaderSessionID = null;
-				} else {
-					ByteArrayInputStream bais = new ByteArrayInputStream(data);
-					ObjectInputStream ois = new ObjectInputStream(bais);
-
-					leaderAddress = ois.readUTF();
-					leaderSessionID = (UUID) ois.readObject();
+		synchronized (lock) {
+			if (running) {
+				try {
+					LOG.debug("Leader node has changed.");
+
+					ChildData childData = cache.getCurrentData();
+
+					String leaderAddress;
+					UUID leaderSessionID;
+
+					if (childData == null) {
+						leaderAddress = null;
+						leaderSessionID = null;
+					} else {
+						byte[] data = childData.getData();
+
+						if (data == null || data.length == 0) {
+							leaderAddress = null;
+							leaderSessionID = null;
+						} else {
+							ByteArrayInputStream bais = new ByteArrayInputStream(data);
+							ObjectInputStream ois = new ObjectInputStream(bais);
+
+							leaderAddress = ois.readUTF();
+							leaderSessionID = (UUID) ois.readObject();
+						}
+					}
+
+					if (!(Objects.equals(leaderAddress, lastLeaderAddress) &&
+						Objects.equals(leaderSessionID, lastLeaderSessionID))) {
+						LOG.debug(
+							"New leader information: Leader={}, session ID={}.",
+							leaderAddress,
+							leaderSessionID);
+
+						lastLeaderAddress = leaderAddress;
+						lastLeaderSessionID = leaderSessionID;
+						leaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID);
+					}
+				} catch (Exception e) {
+					leaderListener.handleError(new Exception("Could not handle node changed event.", e));
+					throw e;
 				}
+			} else {
+				LOG.debug("Ignoring node change notification since the service has already been stopped.");
 			}
-
-			if(!(Objects.equals(leaderAddress, lastLeaderAddress) &&
-					Objects.equals(leaderSessionID, lastLeaderSessionID))) {
-				LOG.debug(
-					"New leader information: Leader={}, session ID={}.",
-					leaderAddress,
-					leaderSessionID);
-
-				lastLeaderAddress = leaderAddress;
-				lastLeaderSessionID = leaderSessionID;
-				leaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID);
-			}
-		} catch (Exception e) {
-			leaderListener.handleError(new Exception("Could not handle node changed event.", e));
-			throw e;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
new file mode 100644
index 0000000..07178fc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Mini cluster to run the old JobManager code without embedded high availability services. This
+ * class has been implemented because the normal {@link FlinkMiniCluster} has been changed to use
+ * the {@link HighAvailabilityServices}. With this change we can no longer use the
+ * {@link org.apache.flink.api.java.RemoteEnvironment} to connect against the
+ * {@link FlinkMiniCluster}, because the remote environment cannot retrieve the current leader
+ * session id.
+ */
+public class StandaloneMiniCluster {
+
+	private static final String LOCAL_HOSTNAME = "localhost";
+
+	private final Configuration configuration;
+
+	private final ActorSystem actorSystem;
+
+	private final ScheduledExecutorService scheduledExecutorService;
+
+	private final HighAvailabilityServices highAvailabilityServices;
+
+	private final FiniteDuration timeout;
+
+	private final int port;
+
+	public StandaloneMiniCluster(Configuration configuration) throws Exception {
+		this.configuration = Preconditions.checkNotNull(configuration);
+
+		timeout = AkkaUtils.getTimeout(configuration);
+
+		actorSystem = JobManager.startActorSystem(
+			configuration,
+			LOCAL_HOSTNAME,
+			0);
+
+		port = configuration.getInteger(JobManagerOptions.PORT);
+
+		scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+		highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+			configuration,
+			Executors.directExecutor(),
+			HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
+
+		JobManager.startJobManagerActors(
+			configuration,
+			actorSystem,
+			scheduledExecutorService,
+			scheduledExecutorService,
+			highAvailabilityServices,
+			JobManager.class,
+			MemoryArchivist.class);
+
+		ActorRef taskManager = TaskManager.startTaskManagerComponentsAndActor(
+			configuration,
+			ResourceID.generate(),
+			actorSystem,
+			highAvailabilityServices,
+			LOCAL_HOSTNAME,
+			Option.<String>empty(),
+			true,
+			TaskManager.class);
+
+		Future<Object> registrationFuture = Patterns.ask(
+			taskManager,
+			TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+			timeout.toMillis());
+
+		Await.ready(registrationFuture, timeout);
+	}
+
+	public String getHostname() {
+		return LOCAL_HOSTNAME;
+	}
+
+	public int getPort() {
+		return port;
+	}
+
+	public Configuration getConfiguration() {
+		return configuration;
+	}
+
+	public void close() throws Exception {
+		Exception exception = null;
+
+		actorSystem.shutdown();
+		actorSystem.awaitTermination();
+
+		try {
+			highAvailabilityServices.closeAndCleanupAllData();
+		} catch (Exception e) {
+			exception = e;
+		}
+
+		scheduledExecutorService.shutdownNow();
+
+		try {
+			scheduledExecutorService.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
+		} catch (InterruptedException e) {
+			Thread.currentThread().interrupt();
+
+			exception = ExceptionUtils.firstOrSuppressed(e, exception);
+		}
+
+		if (exception != null) {
+			throw exception;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
index f9cf01d..9b05273 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.QueryableStateOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.FixedDelayLookupRetryStrategyFactory;
 import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategyFactory;
@@ -38,7 +39,6 @@ import org.apache.flink.runtime.query.netty.KvStateServer;
 import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace;
 import org.apache.flink.runtime.query.netty.UnknownKvStateID;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -102,14 +102,16 @@ public class QueryableStateClient {
 	 * system and another for the network client.
 	 *
 	 * @param config Configuration to use.
+	 * @param highAvailabilityServices Service factory for high availability services
 	 * @throws Exception Failures are forwarded
 	 */
-	public QueryableStateClient(Configuration config) throws Exception {
+	public QueryableStateClient(
+			Configuration config,
+			HighAvailabilityServices highAvailabilityServices) throws Exception {
 		Preconditions.checkNotNull(config, "Configuration");
 
 		// Create a leader retrieval service
-		LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils
-				.createLeaderRetrievalService(config, true);
+		LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
 
 		// Get the ask timeout
 		String askTimeoutString = config.getString(

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index eab4de8..8789eed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -112,10 +112,12 @@ public class AkkaRpcServiceUtils {
 
 	/**
 	 *
-	 * @param hostname     The hostname or address where the target RPC service is listening.
-	 * @param port         The port where the target RPC service is listening.
+	 * @param hostname The hostname or address where the target RPC service is listening.
+	 * @param port The port where the target RPC service is listening.
 	 * @param endpointName The name of the RPC endpoint.
-	 * @param config       The configuration from which to deduce further settings.
+	 * @param addressResolution Whether to try address resolution of the given hostname or not.
+	 *                          This allows to fail fast in case that the hostname cannot be resolved.
+	 * @param config The configuration from which to deduce further settings.
 	 *
 	 * @return The RPC URL of the specified RPC endpoint.
 	 */
@@ -143,10 +145,12 @@ public class AkkaRpcServiceUtils {
 
 	/**
 	 * 
-	 * @param hostname     The hostname or address where the target RPC service is listening.
-	 * @param port         The port where the target RPC service is listening.
+	 * @param hostname The hostname or address where the target RPC service is listening.
+	 * @param port The port where the target RPC service is listening.
 	 * @param endpointName The name of the RPC endpoint.
-	 * @param akkaProtocol       True, if security/encryption is enabled, false otherwise.
+	 * @param addressResolution Whether to try address resolution of the given hostname or not.
+	 *                          This allows to fail fast in case that the hostname cannot be resolved.
+	 * @param akkaProtocol True, if security/encryption is enabled, false otherwise.
 	 * 
 	 * @return The RPC URL of the specified RPC endpoint.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 05749c4..073c52b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.net.ConnectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,57 +50,6 @@ import java.util.UUID;
 public class LeaderRetrievalUtils {
 
 	private static final Logger LOG = LoggerFactory.getLogger(LeaderRetrievalUtils.class);
-
-	/**
-	 * Creates a {@link LeaderRetrievalService} based on the provided {@link Configuration} object.
-	 *
-	 * @param configuration Configuration containing the settings for the {@link LeaderRetrievalService}
-	 * @param resolveInitialHostName If true, resolves the initial hostname
-	 * @return The {@link LeaderRetrievalService} specified in the configuration object
-	 * @throws Exception
-	 */
-	public static LeaderRetrievalService createLeaderRetrievalService(
-			Configuration configuration,
-			boolean resolveInitialHostName)
-		throws Exception {
-
-		HighAvailabilityMode highAvailabilityMode = getRecoveryMode(configuration);
-
-		switch (highAvailabilityMode) {
-			case NONE:
-				return StandaloneUtils.createLeaderRetrievalService(configuration, resolveInitialHostName, null);
-			case ZOOKEEPER:
-				return ZooKeeperUtils.createLeaderRetrievalService(configuration);
-			default:
-				throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
-		}
-	}
-
-	/**
-	 * Creates a {@link LeaderRetrievalService} that either uses the distributed leader election
-	 * configured in the configuration, or, in standalone mode, the given actor reference.
-	 *
-	 * @param configuration Configuration containing the settings for the {@link LeaderRetrievalService}
-	 * @param standaloneRef Actor reference to be used in standalone mode. 
-	 *                      
-	 * @return The {@link LeaderRetrievalService} specified in the configuration object
-	 * @throws Exception
-	 */
-	public static LeaderRetrievalService createLeaderRetrievalService(
-				Configuration configuration, ActorRef standaloneRef) throws Exception {
-
-		HighAvailabilityMode highAvailabilityMode = getRecoveryMode(configuration);
-
-		switch (highAvailabilityMode) {
-			case NONE:
-				String akkaUrl = standaloneRef.path().toSerializationFormat();
-				return new StandaloneLeaderRetrievalService(akkaUrl);
-			case ZOOKEEPER:
-				return ZooKeeperUtils.createLeaderRetrievalService(configuration);
-			default:
-				throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
-		}
-	}
 	
 	/**
 	 * Retrieves the current leader gateway using the given {@link LeaderRetrievalService}. If the
@@ -173,6 +121,21 @@ public class LeaderRetrievalUtils {
 		}
 	}
 
+	/**
+	 * Retrieves the current leader session id of the component identified by the given leader
+	 * retrieval service.
+	 *
+	 * @param leaderRetrievalService Leader retrieval service to be used for the leader retrieval
+	 * @param timeout Timeout for the leader retrieval
+	 * @return The leader session id of the retrieved leader
+	 * @throws LeaderRetrievalException if the leader retrieval operation fails (including a timeout)
+	 */
+	public static UUID retrieveLeaderSessionId(
+			LeaderRetrievalService leaderRetrievalService,
+			FiniteDuration timeout) throws LeaderRetrievalException {
+		return retrieveLeaderConnectionInfo(leaderRetrievalService, timeout).getLeaderSessionID();
+	}
+
 	public static InetAddress findConnectingAddress(
 		LeaderRetrievalService leaderRetrievalService,
 		Time timeout) throws LeaderRetrievalException {

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
index ed485d0..f6f08e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
@@ -135,6 +135,10 @@ public class SerializedThrowable extends Exception implements Serializable {
 		return cached;
 	}
 
+	public String getOriginalErrorClassName() {
+		return originalErrorClassName;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Override the behavior of Throwable
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 1b73dc7..9ade5ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -159,18 +159,6 @@ public class ZooKeeperUtils {
 	/**
 	 * Creates a {@link ZooKeeperLeaderRetrievalService} instance.
 	 *
-	 * @param configuration {@link Configuration} object containing the configuration values
-	 * @return {@link ZooKeeperLeaderRetrievalService} instance.
-	 */
-	public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
-			Configuration configuration) throws Exception {
-		final CuratorFramework client = startCuratorFramework(configuration);
-		return createLeaderRetrievalService(client, configuration);
-	}
-
-	/**
-	 * Creates a {@link ZooKeeperLeaderRetrievalService} instance.
-	 *
 	 * @param client        The {@link CuratorFramework} ZooKeeper client to use
 	 * @param configuration {@link Configuration} object containing the configuration values
 	 * @return {@link ZooKeeperLeaderRetrievalService} instance.
@@ -207,21 +195,6 @@ public class ZooKeeperUtils {
 	}
 
 	/**
-	 * Creates a {@link ZooKeeperLeaderElectionService} instance and a new {@link
-	 * CuratorFramework} client.
-	 *
-	 * @param configuration {@link Configuration} object containing the configuration values
-	 * @return {@link ZooKeeperLeaderElectionService} instance.
-	 */
-	public static ZooKeeperLeaderElectionService createLeaderElectionService(
-			Configuration configuration) throws Exception {
-
-		CuratorFramework client = startCuratorFramework(configuration);
-
-		return createLeaderElectionService(client, configuration);
-	}
-
-	/**
 	 * Creates a {@link ZooKeeperLeaderElectionService} instance.
 	 *
 	 * @param client        The {@link CuratorFramework} ZooKeeper client to use