You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:13 UTC

[08/16] flink git commit: [FLINK-6078] Remove CuratorFramework#close calls from ZooKeeper based HA services

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 195baa1..c8459e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-import akka.actor.*;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Status;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestProbe;
 import com.typesafe.config.Config;
@@ -42,6 +45,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
@@ -55,8 +59,6 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
@@ -95,16 +97,15 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.TestLogger;
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.mockito.ArgumentCaptor;
 import scala.Option;
 import scala.Some;
 import scala.Tuple2;
@@ -117,6 +118,7 @@ import scala.reflect.ClassTag$;
 import java.io.File;
 import java.net.InetAddress;
 import java.util.Collections;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -127,15 +129,14 @@ import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.Al
 import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.JobStatusIs;
 import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered;
 import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
+import static org.apache.flink.runtime.testingUtils.TestingUtils.TESTING_TIMEOUT;
 import static org.apache.flink.runtime.testingUtils.TestingUtils.startTestingCluster;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
 
 public class JobManagerTest extends TestLogger {
 
@@ -144,6 +145,8 @@ public class JobManagerTest extends TestLogger {
 
 	private static ActorSystem system;
 
+	private HighAvailabilityServices highAvailabilityServices;
+
 	@BeforeClass
 	public static void setup() {
 		system = AkkaUtils.createLocalActorSystem(new Configuration());
@@ -154,6 +157,17 @@ public class JobManagerTest extends TestLogger {
 		JavaTestKit.shutdownActorSystem(system);
 	}
 
+	@Before
+	public void setupTest() {
+		highAvailabilityServices = new EmbeddedHaServices(TestingUtils.defaultExecutor());
+	}
+
+	@After
+	public void tearDownTest() throws Exception {
+		highAvailabilityServices.closeAndCleanupAllData();
+		highAvailabilityServices = null;
+	}
+
 	@Test
 	public void testNullHostnameGoesToLocalhost() {
 		try {
@@ -589,32 +603,36 @@ public class JobManagerTest extends TestLogger {
 		Configuration config = new Configuration();
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100ms");
 
+		ActorRef jobManagerActor = JobManager.startJobManagerActors(
+			config,
+			system,
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			highAvailabilityServices,
+			TestingJobManager.class,
+			MemoryArchivist.class)._1();
+
+		UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
+			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+			TestingUtils.TESTING_TIMEOUT());
+
 		ActorGateway jobManager = new AkkaActorGateway(
-				JobManager.startJobManagerActors(
-					config,
-					system,
-					TestingUtils.defaultExecutor(),
-					TestingUtils.defaultExecutor(),
-					TestingJobManager.class,
-					MemoryArchivist.class)._1(),
-				HighAvailabilityServices.DEFAULT_LEADER_ID);
-
-		LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService(
-				AkkaUtils.getAkkaURL(system, jobManager.actor()));
+				jobManagerActor,
+				leaderId);
 
 		Configuration tmConfig = new Configuration();
 		tmConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
 		tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
 
 		ActorRef taskManager = TaskManager.startTaskManagerComponentsAndActor(
-				tmConfig,
-				ResourceID.generate(),
-				system,
-				"localhost",
-				scala.Option.<String>empty(),
-				scala.Option.apply(leaderRetrievalService),
-				true,
-				TestingTaskManager.class);
+			tmConfig,
+			ResourceID.generate(),
+			system,
+			highAvailabilityServices,
+			"localhost",
+			scala.Option.<String>empty(),
+			true,
+			TestingTaskManager.class);
 
 		Future<Object> registrationFuture = jobManager
 				.ask(new NotifyWhenAtLeastNumTaskManagerAreRegistered(1), deadline.timeLeft());
@@ -783,6 +801,7 @@ public class JobManagerTest extends TestLogger {
 		// Wait for failure
 		JobStatusIs jobStatus = Await.result(failedFuture, deadline.timeLeft());
 		assertEquals(JobStatus.FAILED, jobStatus.state());
+
 	}
 
 	@Test
@@ -805,25 +824,30 @@ public class JobManagerTest extends TestLogger {
 				actorSystem,
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
+				highAvailabilityServices,
 				Option.apply("jm"),
 				Option.apply("arch"),
 				TestingJobManager.class,
 				TestingMemoryArchivist.class);
 
-			jobManager = new AkkaActorGateway(master._1(), HighAvailabilityServices.DEFAULT_LEADER_ID);
-			archiver = new AkkaActorGateway(master._2(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+			UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
+				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+				TestingUtils.TESTING_TIMEOUT());
+
+			jobManager = new AkkaActorGateway(master._1(), leaderId);
+			archiver = new AkkaActorGateway(master._2(), leaderId);
 
 			ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
-					config,
-					ResourceID.generate(),
-					actorSystem,
-					"localhost",
-					Option.apply("tm"),
-					Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path())),
-					true,
-					TestingTaskManager.class);
+				config,
+				ResourceID.generate(),
+				actorSystem,
+				highAvailabilityServices,
+				"localhost",
+				Option.apply("tm"),
+				true,
+				TestingTaskManager.class);
 
-			taskManager = new AkkaActorGateway(taskManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
+			taskManager = new AkkaActorGateway(taskManagerRef, leaderId);
 
 			// Wait until connected
 			Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
@@ -907,6 +931,10 @@ public class JobManagerTest extends TestLogger {
 			if (taskManager != null) {
 				taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
 			}
+
+			if (actorSystem != null) {
+				actorSystem.awaitTermination(TESTING_TIMEOUT());
+			}
 		}
 	}
 
@@ -931,25 +959,30 @@ public class JobManagerTest extends TestLogger {
 				actorSystem,
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
+				highAvailabilityServices,
 				Option.apply("jm"),
 				Option.apply("arch"),
 				TestingJobManager.class,
 				TestingMemoryArchivist.class);
 
-			jobManager = new AkkaActorGateway(master._1(), HighAvailabilityServices.DEFAULT_LEADER_ID);
-			archiver = new AkkaActorGateway(master._2(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+			UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
+				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+				TestingUtils.TESTING_TIMEOUT());
+
+			jobManager = new AkkaActorGateway(master._1(), leaderId);
+			archiver = new AkkaActorGateway(master._2(), leaderId);
 
 			ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
 				config,
 				ResourceID.generate(),
 				actorSystem,
+				highAvailabilityServices,
 				"localhost",
 				Option.apply("tm"),
-				Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path())),
 				true,
 				TestingTaskManager.class);
 
-			taskManager = new AkkaActorGateway(taskManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
+			taskManager = new AkkaActorGateway(taskManagerRef, leaderId);
 
 			// Wait until connected
 			Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
@@ -1037,25 +1070,30 @@ public class JobManagerTest extends TestLogger {
 				actorSystem,
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
+				highAvailabilityServices,
 				Option.apply("jm"),
 				Option.apply("arch"),
 				TestingJobManager.class,
 				TestingMemoryArchivist.class);
 
-			jobManager = new AkkaActorGateway(master._1(), HighAvailabilityServices.DEFAULT_LEADER_ID);
-			archiver = new AkkaActorGateway(master._2(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+			UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
+				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+				TestingUtils.TESTING_TIMEOUT());
+
+			jobManager = new AkkaActorGateway(master._1(), leaderId);
+			archiver = new AkkaActorGateway(master._2(), leaderId);
 
 			ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
-					config,
-					ResourceID.generate(),
-					actorSystem,
-					"localhost",
-					Option.apply("tm"),
-					Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path())),
-					true,
-					TestingTaskManager.class);
+				config,
+				ResourceID.generate(),
+				actorSystem,
+				highAvailabilityServices,
+				"localhost",
+				Option.apply("tm"),
+				true,
+				TestingTaskManager.class);
 
-			taskManager = new AkkaActorGateway(taskManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
+			taskManager = new AkkaActorGateway(taskManagerRef, leaderId);
 
 			// Wait until connected
 			Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
@@ -1115,6 +1153,10 @@ public class JobManagerTest extends TestLogger {
 			if (taskManager != null) {
 				taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
 			}
+
+			if (actorSystem != null) {
+				actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
+			}
 		}
 	}
 
@@ -1137,28 +1179,33 @@ public class JobManagerTest extends TestLogger {
 				actorSystem,
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
+				highAvailabilityServices,
 				Option.apply("jm"),
 				Option.apply("arch"),
 				TestingJobManager.class,
 				TestingMemoryArchivist.class);
 
-			jobManager = new AkkaActorGateway(master._1(), HighAvailabilityServices.DEFAULT_LEADER_ID);
-			archiver = new AkkaActorGateway(master._2(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+			UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
+				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+				TestingUtils.TESTING_TIMEOUT());
+
+			jobManager = new AkkaActorGateway(master._1(), leaderId);
+			archiver = new AkkaActorGateway(master._2(), leaderId);
 
 			Configuration tmConfig = new Configuration();
 			tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
 
 			ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
-					tmConfig,
-					ResourceID.generate(),
-					actorSystem,
-					"localhost",
-					Option.apply("tm"),
-					Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path())),
-					true,
-					TestingTaskManager.class);
+				tmConfig,
+				ResourceID.generate(),
+				actorSystem,
+				highAvailabilityServices,
+				"localhost",
+				Option.apply("tm"),
+				true,
+				TestingTaskManager.class);
 
-			taskManager = new AkkaActorGateway(taskManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
+			taskManager = new AkkaActorGateway(taskManagerRef, leaderId);
 
 			// Wait until connected
 			Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
@@ -1274,6 +1321,10 @@ public class JobManagerTest extends TestLogger {
 			if (taskManager != null) {
 				taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
 			}
+
+			if (actorSystem != null) {
+				actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
+			}
 		}
 	}
 
@@ -1298,7 +1349,8 @@ public class JobManagerTest extends TestLogger {
 				actorSystem,
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
-				configuration);
+				configuration,
+				highAvailabilityServices);
 
 			final TestProbe probe = TestProbe.apply(actorSystem);
 			final AkkaActorGateway rmGateway = new AkkaActorGateway(probe.ref(), HighAvailabilityServices.DEFAULT_LEADER_ID);
@@ -1311,7 +1363,7 @@ public class JobManagerTest extends TestLogger {
 
 			JobManagerMessages.LeaderSessionMessage leaderSessionMessage = probe.expectMsgClass(JobManagerMessages.LeaderSessionMessage.class);
 
-			assertEquals(HighAvailabilityServices.DEFAULT_LEADER_ID, leaderSessionMessage.leaderSessionID());
+			assertEquals(jmGateway.leaderSessionID(), leaderSessionMessage.leaderSessionID());
 			assertTrue(leaderSessionMessage.message() instanceof RegisterResourceManagerSuccessful);
 
 			jmGateway.tell(

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index fcca173..37f503f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -68,6 +70,7 @@ public class JobSubmitTest {
 	private static ActorSystem jobManagerSystem;
 	private static ActorGateway jmGateway;
 	private static Configuration jmConfig;
+	private static HighAvailabilityServices highAvailabilityServices;
 
 	@BeforeClass
 	public static void setupJobManager() {
@@ -81,17 +84,20 @@ public class JobSubmitTest {
 		scala.Option<Tuple2<String, Object>> listeningAddress = scala.Option.apply(new Tuple2<String, Object>("localhost", port));
 		jobManagerSystem = AkkaUtils.createActorSystem(jmConfig, listeningAddress);
 
+		highAvailabilityServices = new EmbeddedHaServices(TestingUtils.defaultExecutor());
+
 		// only start JobManager (no ResourceManager)
 		JobManager.startJobManagerActors(
 			jmConfig,
 			jobManagerSystem,
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
+			highAvailabilityServices,
 			JobManager.class,
 			MemoryArchivist.class)._1();
 
 		try {
-			LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(jmConfig, false);
+			LeaderRetrievalService lrs = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
 
 			jmGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
 					lrs,
@@ -104,10 +110,15 @@ public class JobSubmitTest {
 	}
 
 	@AfterClass
-	public static void teardownJobmanager() {
+	public static void teardownJobmanager() throws Exception {
 		if (jobManagerSystem != null) {
 			jobManagerSystem.shutdown();
 		}
+
+		if (highAvailabilityServices != null) {
+			highAvailabilityServices.closeAndCleanupAllData();
+			highAvailabilityServices = null;
+		}
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index d2221c5..753c797 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -56,7 +57,7 @@ import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobManagerRunner.class)
-public class JobManagerRunnerMockTest {
+public class JobManagerRunnerMockTest extends TestLogger {
 
 	private JobManagerRunner runner;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index b1bb548..942fcf3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -18,9 +18,13 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingManualHighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -28,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.util.TestLogger;
 
@@ -52,11 +57,15 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 	private int numSlotsPerTM = 1;
 	private int parallelism = numTMs * numSlotsPerTM;
 
-	private LeaderElectionRetrievalTestingCluster cluster = null;
+	private JobID jobId;
+	private TestingCluster cluster = null;
 	private JobGraph job = createBlockingJob(parallelism);
+	private TestingManualHighAvailabilityServices highAvailabilityServices;
 
 	@Before
 	public void before() throws TimeoutException, InterruptedException {
+		jobId = HighAvailabilityServices.DEFAULT_JOB_ID;
+
 		Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
 
 		Configuration configuration = new Configuration();
@@ -69,7 +78,13 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 		configuration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 9999);
 		configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "100 milli");
 
-		cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false);
+		highAvailabilityServices = new TestingManualHighAvailabilityServices();
+
+		cluster = new TestingCluster(
+			configuration,
+			highAvailabilityServices,
+			true,
+			false);
 		cluster.start(false);
 
 		// wait for actors to be alive so that they have started their leader retrieval service
@@ -86,8 +101,15 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 	public void testNotRestartedWhenLosingLeadership() throws Exception {
 		UUID leaderSessionID = UUID.randomUUID();
 
-		cluster.grantLeadership(0, leaderSessionID);
-		cluster.notifyRetrievalListeners(0, leaderSessionID);
+		highAvailabilityServices.grantLeadership(
+			jobId,
+			0,
+			leaderSessionID);
+
+		highAvailabilityServices.notifyRetrievers(
+			jobId,
+			0,
+			leaderSessionID);
 
 		cluster.waitForTaskManagersToBeRegistered(timeout);
 
@@ -108,7 +130,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 
 		ExecutionGraph executionGraph = (ExecutionGraph) ((TestingJobManagerMessages.ExecutionGraphFound) responseExecutionGraph).executionGraph();
 
-		cluster.revokeLeadership();
+		highAvailabilityServices.revokeLeadership(jobId);
 
 		executionGraph.getTerminationFuture().get(30, TimeUnit.SECONDS);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
index 7ae9974..bbcbbf0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingManualHighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -28,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -58,12 +62,16 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 	private int numSlotsPerTM = 2;
 	private int parallelism = numTMs * numSlotsPerTM;
 
+	private JobID jobId;
 	private Configuration configuration;
-	private LeaderElectionRetrievalTestingCluster cluster = null;
+	private TestingManualHighAvailabilityServices highAvailabilityServices;
+	private TestingCluster cluster = null;
 	private JobGraph job = createBlockingJob(parallelism);
 
 	@Before
 	public void before() throws Exception {
+		jobId = HighAvailabilityServices.DEFAULT_JOB_ID;
+
 		Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
 
 		configuration = new Configuration();
@@ -72,7 +80,13 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
 		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
 
-		cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false);
+		highAvailabilityServices = new TestingManualHighAvailabilityServices();
+
+		cluster = new TestingCluster(
+			configuration,
+			highAvailabilityServices,
+			true,
+			false);
 		cluster.start(false); // TaskManagers don't have to register at the JobManager
 
 		cluster.waitForActorsToBeAlive(); // we only wait until all actors are alive
@@ -96,9 +110,9 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 		UUID leaderSessionID2 = UUID.randomUUID();
 
 		// first make JM(0) the leader
-		cluster.grantLeadership(0, leaderSessionID1);
+		highAvailabilityServices.grantLeadership(jobId, 0, leaderSessionID1);
 		// notify all listeners
-		cluster.notifyRetrievalListeners(0, leaderSessionID1);
+		highAvailabilityServices.notifyRetrievers(jobId, 0, leaderSessionID1);
 
 		cluster.waitForTaskManagersToBeRegistered(timeout);
 
@@ -114,9 +128,9 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 		Future<Object> jobRemoval = jm.ask(new NotifyWhenJobRemoved(job.getJobID()), timeout);
 
 		// make the JM(1) the new leader
-		cluster.grantLeadership(1, leaderSessionID2);
+		highAvailabilityServices.grantLeadership(jobId, 1, leaderSessionID2);
 		// notify all listeners about the event
-		cluster.notifyRetrievalListeners(1, leaderSessionID2);
+		highAvailabilityServices.notifyRetrievers(jobId, 1, leaderSessionID2);
 
 		Await.ready(jobRemoval, timeout);
 
@@ -133,7 +147,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 
 		// try to resubmit now the non-blocking job, it should complete successfully
 		Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(false);
-		cluster.submitJobAndWait(job, false, timeout, new TestingLeaderRetrievalService(jm2.path(), jm2.leaderSessionID()));
+		cluster.submitJobAndWait(job, false, timeout);
 	}
 
 	/**
@@ -146,8 +160,8 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 		UUID leaderSessionID = UUID.randomUUID();
 		UUID newLeaderSessionID = UUID.randomUUID();
 
-		cluster.grantLeadership(0, leaderSessionID);
-		cluster.notifyRetrievalListeners(0, leaderSessionID);
+		highAvailabilityServices.grantLeadership(jobId, 0, leaderSessionID);
+		highAvailabilityServices.notifyRetrievers(jobId, 0, leaderSessionID);
 
 		cluster.waitForTaskManagersToBeRegistered(timeout);
 
@@ -163,7 +177,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 		Future<Object> jobRemoval = jm.ask(new NotifyWhenJobRemoved(job.getJobID()), timeout);
 
 		// only notify the JMs about the new leader JM(1)
-		cluster.grantLeadership(1, newLeaderSessionID);
+		highAvailabilityServices.grantLeadership(jobId, 1, newLeaderSessionID);
 
 		// job should be removed anyway
 		Await.ready(jobRemoval, timeout);
@@ -179,8 +193,8 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 		UUID leaderSessionID = UUID.randomUUID();
 		UUID newLeaderSessionID = UUID.randomUUID();
 
-		cluster.grantLeadership(0, leaderSessionID);
-		cluster.notifyRetrievalListeners(0, leaderSessionID);
+		highAvailabilityServices.grantLeadership(jobId, 0, leaderSessionID);
+		highAvailabilityServices.notifyRetrievers(jobId, 0, leaderSessionID);
 
 		cluster.waitForTaskManagersToBeRegistered(timeout);
 
@@ -196,7 +210,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 		Future<Object> jobRemoval = jm.ask(new NotifyWhenJobRemoved(job.getJobID()), timeout);
 
 		// notify listeners (TMs) about the leader change
-		cluster.notifyRetrievalListeners(1, newLeaderSessionID);
+		highAvailabilityServices.notifyRetrievers(jobId, 1, newLeaderSessionID);
 
 		Await.ready(jobRemoval, timeout);
 	}
@@ -213,8 +227,8 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 
 		FiniteDuration shortTimeout = new FiniteDuration(10, TimeUnit.SECONDS);
 
-		cluster.grantLeadership(0, leaderSessionID);
-		cluster.notifyRetrievalListeners(0, leaderSessionID);
+		highAvailabilityServices.grantLeadership(jobId, 0, leaderSessionID);
+		highAvailabilityServices.notifyRetrievers(jobId, 0, leaderSessionID);
 
 		cluster.waitForTaskManagersToBeRegistered(timeout);
 
@@ -232,7 +246,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 		LOG.info("Make JM(0) again the leader. This should first revoke the leadership.");
 
 		// make JM(0) again the leader --> this implies first a leadership revocation
-		cluster.grantLeadership(0, newLeaderSessionID);
+		highAvailabilityServices.grantLeadership(jobId, 0, newLeaderSessionID);
 
 		Await.ready(jobRemoval, timeout);
 
@@ -250,7 +264,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 		LOG.info("Notify TMs about the new (old) leader.");
 
 		// notify the TMs about the new (old) leader
-		cluster.notifyRetrievalListeners(0, newLeaderSessionID);
+		highAvailabilityServices.notifyRetrievers(jobId,0, newLeaderSessionID);
 
 		cluster.waitForTaskManagersToBeRegistered(timeout);
 
@@ -258,7 +272,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 
 		// try to resubmit now the non-blocking job, it should complete successfully
 		Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(false);
-		cluster.submitJobAndWait(job, false, timeout, new TestingLeaderRetrievalService(leaderGateway.path(), leaderGateway.leaderSessionID()));
+		cluster.submitJobAndWait(job, false, timeout);
 	}
 
 	public JobGraph createBlockingJob(int parallelism) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
deleted file mode 100644
index 1cab0ea..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.leaderelection;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import scala.Option;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-/**
- * A testing cluster which allows to manually trigger grantLeadership and notifyRetrievalListener
- * events. The grantLeadership event assigns the specified JobManager the leadership. The
- * notifyRetrievalListener notifies all listeners that the specified JobManager (index) has been
- * granted the leadership.
- */
-public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
-
-	private final Configuration userConfiguration;
-	private final boolean useSingleActorSystem;
-
-	public List<TestingLeaderElectionService> leaderElectionServices;
-	public List<TestingLeaderRetrievalService> leaderRetrievalServices;
-
-	private int leaderIndex = -1;
-
-	public LeaderElectionRetrievalTestingCluster(
-			Configuration userConfiguration,
-			boolean singleActorSystem,
-			boolean synchronousDispatcher) {
-		super(userConfiguration, singleActorSystem, synchronousDispatcher);
-
-		this.userConfiguration = userConfiguration;
-		this.useSingleActorSystem = singleActorSystem;
-
-		leaderElectionServices = new ArrayList<>();
-		leaderRetrievalServices = new ArrayList<>();
-	}
-
-	@Override
-	public Configuration userConfiguration() {
-		return this.userConfiguration;
-	}
-
-	@Override
-	public boolean useSingleActorSystem() {
-		return useSingleActorSystem;
-	}
-
-	@Override
-	public Option<LeaderElectionService> createLeaderElectionService() {
-		leaderElectionServices.add(new TestingLeaderElectionService());
-
-		LeaderElectionService result = leaderElectionServices.get(leaderElectionServices.size() - 1);
-
-		return Option.apply(result);
-	}
-
-	@Override
-	public LeaderRetrievalService createLeaderRetrievalService() {
-		leaderRetrievalServices.add(new TestingLeaderRetrievalService(
-			null,
-			null));
-
-		return leaderRetrievalServices.get(leaderRetrievalServices.size() - 1);
-	}
-
-	@Override
-	public int getNumberOfJobManagers() {
-		return this.originalConfiguration().getInteger(
-				ConfigConstants.LOCAL_NUMBER_JOB_MANAGER,
-				ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER);
-	}
-
-	public void grantLeadership(int index, UUID leaderSessionID) {
-		if(leaderIndex >= 0) {
-			// first revoke leadership
-			leaderElectionServices.get(leaderIndex).notLeader();
-		}
-
-		// make the JM with index the new leader
-		leaderElectionServices.get(index).isLeader(leaderSessionID);
-
-		leaderIndex = index;
-	}
-
-	public void notifyRetrievalListeners(int index, UUID leaderSessionID) {
-		String address = jobManagerActors().get().apply(index).path().toString();
-
-		for(TestingLeaderRetrievalService service: leaderRetrievalServices) {
-			service.notifyListener(address, leaderSessionID);
-		}
-	}
-
-	public void revokeLeadership() {
-		if (leaderIndex >= 0) {
-			leaderElectionServices.get(leaderIndex).notLeader();
-			leaderIndex = -1;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
index 16779fa..d456083 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
@@ -63,4 +63,8 @@ public class TestingLeaderElectionService implements LeaderElectionService {
 		contender = null;
 		hasLeadership  = false;
 	}
+
+	public String getAddress() {
+		return contender.getAddress();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
index 887772a..15d3bde 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
@@ -35,6 +35,10 @@ public class TestingLeaderRetrievalService implements LeaderRetrievalService {
 
 	private volatile LeaderRetrievalListener listener;
 
+	public TestingLeaderRetrievalService() {
+		this(null, null);
+	}
+
 	public TestingLeaderRetrievalService(String leaderAddress, UUID leaderSessionID) {
 		this.leaderAddress = leaderAddress;
 		this.leaderSessionID = leaderSessionID;

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index bc8c0b60..6efd270 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.leaderelection;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.CreateBuilder;
-import org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.NodeCache;
 import org.apache.curator.framework.recipes.cache.NodeCacheListener;
@@ -37,6 +36,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Matchers;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
@@ -48,6 +48,7 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -59,6 +60,11 @@ import static org.junit.Assert.*;
 
 public class ZooKeeperLeaderElectionTest extends TestLogger {
 	private TestingServer testingServer;
+
+	private Configuration configuration;
+
+	private CuratorFramework client;
+
 	private static final String TEST_URL = "akka//user/jobmanager";
 	private static final FiniteDuration timeout = new FiniteDuration(200, TimeUnit.SECONDS);
 
@@ -71,17 +77,26 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 		} catch (Exception e) {
 			throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
 		}
+
+		configuration = new Configuration();
+
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+		client = ZooKeeperUtils.startCuratorFramework(configuration);
 	}
 
 	@After
-	public void after() {
-		try {
-			testingServer.stop();
-		} catch (Exception e) {
-			throw new RuntimeException("Could not stop ZooKeeper testing cluster.", e);
+	public void after() throws IOException {
+		if (client != null) {
+			client.close();
+			client = null;
 		}
 
-		testingServer = null;
+		if (testingServer != null) {
+			testingServer.stop();
+			testingServer = null;
+		}
 	}
 
 	/**
@@ -89,16 +104,12 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	 */
 	@Test
 	public void testZooKeeperLeaderElectionRetrieval() throws Exception {
-		Configuration configuration = new Configuration();
-		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
-		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-
 		ZooKeeperLeaderElectionService leaderElectionService = null;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
 
 		try {
-			leaderElectionService = ZooKeeperUtils.createLeaderElectionService(configuration);
-			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(configuration);
+			leaderElectionService = ZooKeeperUtils.createLeaderElectionService(client, configuration);
+			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(client, configuration);
 
 			TestingContender contender = new TestingContender(TEST_URL, leaderElectionService);
 			TestingListener listener = new TestingListener();
@@ -134,10 +145,6 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	 */
 	@Test
 	public void testZooKeeperReelection() throws Exception {
-		Configuration configuration = new Configuration();
-		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
-		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-
 		Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
 
 		int num = 10;
@@ -149,14 +156,14 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 		TestingListener listener = new TestingListener();
 
 		try {
-			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(configuration);
+			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(client, configuration);
 
 			LOG.debug("Start leader retrieval service for the TestingListener.");
 
 			leaderRetrievalService.start(listener);
 
 			for (int i = 0; i < num; i++) {
-				leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService(configuration);
+				leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService(client, configuration);
 				contenders[i] = new TestingContender(TEST_URL + "_" + i, leaderElectionService[i]);
 
 				LOG.debug("Start leader election service for contender #{}.", i);
@@ -217,10 +224,6 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	 */
 	@Test
 	public void testZooKeeperReelectionWithReplacement() throws Exception {
-		Configuration configuration = new Configuration();
-		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
-		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-
 		int num = 3;
 		int numTries = 30;
 
@@ -231,12 +234,12 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 		TestingListener listener = new TestingListener();
 
 		try {
-			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(configuration);
+			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(client, configuration);
 
 			leaderRetrievalService.start(listener);
 
 			for (int i = 0; i < num; i++) {
-				leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService(configuration);
+				leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService(client, configuration);
 				contenders[i] = new TestingContender(TEST_URL + "_" + i + "_0", leaderElectionService[i]);
 
 				leaderElectionService[i].start(contenders[i]);
@@ -261,7 +264,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 					// stop leader election service = revoke leadership
 					leaderElectionService[index].stop();
 					// create new leader election service which takes part in the leader election
-					leaderElectionService[index] = ZooKeeperUtils.createLeaderElectionService(configuration);
+					leaderElectionService[index] = ZooKeeperUtils.createLeaderElectionService(client, configuration);
 					contenders[index] = new TestingContender(
 							TEST_URL + "_" + index + "_" + (lastTry + 1),
 							leaderElectionService[index]);
@@ -295,9 +298,6 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 		final String FAULTY_CONTENDER_URL = "faultyContender";
 		final String leaderPath = "/leader";
 
-		Configuration configuration = new Configuration();
-		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
-		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 		configuration.setString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, leaderPath);
 
 		ZooKeeperLeaderElectionService leaderElectionService = null;
@@ -308,9 +308,9 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 		TestingContender contender;
 
 		try {
-			leaderElectionService = ZooKeeperUtils.createLeaderElectionService(configuration);
-			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(configuration);
-			leaderRetrievalService2 = ZooKeeperUtils.createLeaderRetrievalService(configuration);
+			leaderElectionService = ZooKeeperUtils.createLeaderElectionService(client, configuration);
+			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(client, configuration);
+			leaderRetrievalService2 = ZooKeeperUtils.createLeaderRetrievalService(client, configuration);
 
 			contender = new TestingContender(TEST_URL, leaderElectionService);
 
@@ -379,18 +379,13 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	 */
 	@Test
 	public void testExceptionForwarding() throws Exception {
-		Configuration configuration = new Configuration();
-		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
-		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-
 		ZooKeeperLeaderElectionService leaderElectionService = null;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
 		TestingListener listener = new TestingListener();
 		TestingContender testingContender;
 
 		CuratorFramework client;
-		final CreateBuilder mockCreateBuilder = mock(CreateBuilder.class);
-		final ProtectACLCreateModePathAndBytesable<String> mockCreateParentsIfNeeded = mock (ProtectACLCreateModePathAndBytesable.class);
+		final CreateBuilder mockCreateBuilder = mock(CreateBuilder.class, Mockito.RETURNS_DEEP_STUBS);
 		final Exception testException = new Exception("Test exception");
 
 		try {
@@ -414,12 +409,14 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 
 			doAnswer(answer).when(client).create();
 
-			when(mockCreateBuilder.creatingParentsIfNeeded()).thenReturn(mockCreateParentsIfNeeded);
-			when(mockCreateParentsIfNeeded.withMode(Matchers.any(CreateMode.class))).thenReturn(mockCreateParentsIfNeeded);
-			when(mockCreateParentsIfNeeded.forPath(Matchers.any(String.class),  Matchers.any(byte[].class))).thenThrow(testException);
+			when(
+				mockCreateBuilder
+				.creatingParentsIfNeeded()
+				.withMode(Matchers.any(CreateMode.class))
+				.forPath(anyString(), any(byte[].class))).thenThrow(testException);
 
 			leaderElectionService = new ZooKeeperLeaderElectionService(client, "/latch", "/leader");
-			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(configuration);
+			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(client, configuration);
 
 			testingContender = new TestingContender(TEST_URL, leaderElectionService);
 
@@ -442,34 +439,33 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that there is no information left in the ZooKeeper cluster after all JobManagers
-	 * have terminated. In other words, checks that the ZooKeeperLeaderElection service uses
+	 * Tests that there is no information left in the ZooKeeper cluster after the ZooKeeper client
+	 * has terminated. In other words, checks that the ZooKeeperLeaderElection service uses
 	 * ephemeral nodes.
 	 */
 	@Test
 	public void testEphemeralZooKeeperNodes() throws Exception {
-		Configuration configuration = new Configuration();
-		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
-		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-
 		ZooKeeperLeaderElectionService leaderElectionService;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
 		TestingContender testingContender;
 		TestingListener listener;
 
 		CuratorFramework client = null;
+		CuratorFramework client2 = null;
 		NodeCache cache = null;
 
 		try {
-			leaderElectionService = ZooKeeperUtils.createLeaderElectionService(configuration);
-			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(configuration);
+			client = ZooKeeperUtils.startCuratorFramework(configuration);
+			client2 = ZooKeeperUtils.startCuratorFramework(configuration);
+
+			leaderElectionService = ZooKeeperUtils.createLeaderElectionService(client, configuration);
+			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(client2, configuration);
 			testingContender = new TestingContender(TEST_URL, leaderElectionService);
 			listener = new TestingListener();
 
-			client = ZooKeeperUtils.startCuratorFramework(configuration);
 			final String leaderPath = configuration.getString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
 					ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
-			cache = new NodeCache(client, leaderPath);
+			cache = new NodeCache(client2, leaderPath);
 
 			ExistsCacheListener existsListener = new ExistsCacheListener(cache);
 			DeletedCacheListener deletedCacheListener = new DeletedCacheListener(cache);
@@ -489,6 +485,9 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 
 			leaderElectionService.stop();
 
+			// now stop the underlying client
+			client.close();
+
 			Future<Boolean> deletedFuture = deletedCacheListener.nodeDeleted();
 
 			// make sure that the leader node has been deleted
@@ -497,7 +496,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 			leaderRetrievalService.start(listener);
 
 			try {
-				listener.waitForNewLeader(1000);
+				listener.waitForNewLeader(1000L);
 
 				fail("TimeoutException was expected because there is no leader registered and " +
 						"thus there shouldn't be any leader information in ZooKeeper.");
@@ -513,8 +512,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 				cache.close();
 			}
 
-			if (client != null) {
-				client.close();
+			if (client2 != null) {
+				client2.close();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index b79093f..0ea47f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -18,18 +18,17 @@
 
 package org.apache.flink.runtime.leaderelection;
 
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
-import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -53,29 +52,41 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
 	private TestingServer testingServer;
 
+	private Configuration config;
+
+	private HighAvailabilityServices highAvailabilityServices;
+
 	@Before
-	public void before() {
-		try {
-			testingServer = new TestingServer();
-		} catch (Exception e) {
-			throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
-		}
+	public void before() throws Exception {
+		testingServer = new TestingServer();
+
+		config = new Configuration();
+		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+
+		highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+			config,
+			TestingUtils.defaultExecutor(),
+			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
 	}
 
 	@After
-	public void after() {
+	public void after() throws Exception {
 		if(testingServer != null) {
-			try {
-				testingServer.stop();
-			} catch (IOException e) {
-				throw new RuntimeException("Could not stop ZooKeeper testing cluster.", e);
-			}
+			testingServer.stop();
+
 			testingServer = null;
 		}
+
+		if (highAvailabilityServices != null) {
+			highAvailabilityServices.closeAndCleanupAllData();
+
+			highAvailabilityServices = null;
+		}
 	}
 
 	/**
-	 * Tests that LeaderRetrievalUtils.findConnectingAdress finds the correct connecting address
+	 * Tests that LeaderRetrievalUtils.findConnectingAddress finds the correct connecting address
 	 * in case of an old leader address in ZooKeeper and a subsequent election of a new leader.
 	 * The findConnectingAddress should block until the new leader has been elected and his
 	 * address has been written to ZooKeeper.
@@ -83,13 +94,9 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 	@Test
 	public void testConnectingAddressRetrievalWithDelayedLeaderElection() throws Exception {
 		FiniteDuration timeout = new FiniteDuration(1, TimeUnit.MINUTES);
-		Configuration config = new Configuration();
 
 		long sleepingTime = 1000;
 
-		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
-
 		LeaderElectionService leaderElectionService = null;
 		LeaderElectionService faultyLeaderElectionService;
 
@@ -98,12 +105,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
 		Thread thread;
 
-		CuratorFramework[] client = new CuratorFramework[2];
-
 		try {
-			client[0] = ZooKeeperUtils.startCuratorFramework(config);
-			client[1] = ZooKeeperUtils.startCuratorFramework(config);
-
 			String wrongAddress = AkkaRpcServiceUtils.getRpcUrl(
 				"1.1.1.1",
 				1234,
@@ -126,7 +128,6 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 			}
 
 			InetSocketAddress correctInetSocketAddress = new InetSocketAddress(localHost, serverSocket.getLocalPort());
-			String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(localHost.getHostName(), correctInetSocketAddress.getPort());
 
 			String correctAddress = AkkaRpcServiceUtils.getRpcUrl(
 				localHost.getHostName(),
@@ -135,18 +136,21 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 				HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION,
 				config);
 
-			faultyLeaderElectionService = ZooKeeperUtils.createLeaderElectionService(client[0], config);
+			faultyLeaderElectionService = highAvailabilityServices.getJobManagerLeaderElectionService(
+				HighAvailabilityServices.DEFAULT_JOB_ID);
 			TestingContender wrongLeaderAddressContender = new TestingContender(wrongAddress, faultyLeaderElectionService);
 
 			faultyLeaderElectionService.start(wrongLeaderAddressContender);
 
-			FindConnectingAddress findConnectingAddress = new FindConnectingAddress(config, timeout);
+			FindConnectingAddress findConnectingAddress = new FindConnectingAddress(
+				timeout,
+				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID));
 
 			thread = new Thread(findConnectingAddress);
 
 			thread.start();
 
-			leaderElectionService = ZooKeeperUtils.createLeaderElectionService(client[1], config);
+			leaderElectionService = highAvailabilityServices.getJobManagerLeaderElectionService(HighAvailabilityServices.DEFAULT_JOB_ID);
 			TestingContender correctLeaderAddressContender = new TestingContender(correctAddress, leaderElectionService);
 
 			Thread.sleep(sleepingTime);
@@ -174,14 +178,6 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 			if (leaderElectionService != null) {
 				leaderElectionService.stop();
 			}
-
-			if (client[0] != null) {
-				client[0].close();
-			}
-
-			if (client[1] != null) {
-				client[1].close();
-			}
 		}
 	}
 
@@ -192,13 +188,9 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 	 */
 	@Test
 	public void testTimeoutOfFindConnectingAddress() throws Exception {
-		Configuration config = new Configuration();
-		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
-
-		FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+		FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS);
 
-		LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config, false);
+		LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
 		InetAddress result = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout);
 
 		assertEquals(InetAddress.getLocalHost(), result);
@@ -206,22 +198,25 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
 	static class FindConnectingAddress implements Runnable {
 
-		private final Configuration config;
 		private final FiniteDuration timeout;
+		private final LeaderRetrievalService leaderRetrievalService;
 
 		private InetAddress result;
 		private Exception exception;
 
-		public FindConnectingAddress(Configuration config, FiniteDuration timeout) {
-			this.config = config;
+		public FindConnectingAddress(
+				FiniteDuration timeout,
+				LeaderRetrievalService leaderRetrievalService) {
 			this.timeout = timeout;
+			this.leaderRetrievalService = leaderRetrievalService;
 		}
 
 		@Override
 		public void run() {
 			try {
-				LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config, false);
-				result = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout);
+				result = LeaderRetrievalUtils.findConnectingAddress(
+					leaderRetrievalService,
+					timeout);
 			} catch (Exception e) {
 				exception = e;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index 100c83d..58f2231 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -26,10 +26,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
@@ -37,15 +36,17 @@ import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
 
 import scala.concurrent.duration.FiniteDuration;
 
 import java.net.InetAddress;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-public class TaskManagerMetricsTest {
+public class TaskManagerMetricsTest extends TestLogger {
 
 	/**
 	 * Tests the metric registry life cycle on JobManager re-connects.
@@ -53,6 +54,9 @@ public class TaskManagerMetricsTest {
 	@Test
 	public void testMetricRegistryLifeCycle() throws Exception {
 		ActorSystem actorSystem = null;
+
+		HighAvailabilityServices highAvailabilityServices = new EmbeddedHaServices(TestingUtils.defaultExecutor());
+
 		try {
 			actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
 
@@ -64,11 +68,10 @@ public class TaskManagerMetricsTest {
 				actorSystem,
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
+				highAvailabilityServices,
 				JobManager.class,
 				MemoryArchivist.class)._1();
 
-			LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManager.path().toString());
-
 			// ================================================================
 			// Start TaskManager
 			// ================================================================
@@ -94,7 +97,7 @@ public class TaskManagerMetricsTest {
 				taskManagerServices.getMemoryManager(),
 				taskManagerServices.getIOManager(),
 				taskManagerServices.getNetworkEnvironment(),
-				leaderRetrievalService,
+				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
 				tmRegistry);
 
 			final ActorRef taskManager = actorSystem.actorOf(tmProps);
@@ -107,20 +110,21 @@ public class TaskManagerMetricsTest {
 							getTestActor());
 
 						// wait for the TM to be registered
-						expectMsgEquals(TaskManagerMessages.getRegisteredAtJobManagerMessage());
+						TaskManagerMessages.RegisteredAtJobManager registeredAtJobManager = expectMsgClass(TaskManagerMessages.RegisteredAtJobManager.class);
+						UUID leaderId = registeredAtJobManager.leaderId();
 
 						// trigger re-registration of TM; this should include a disconnect from the current JM
 						taskManager.tell(
 							new TaskManagerMessages.JobManagerLeaderAddress(
 								jobManager.path().toString(),
-								HighAvailabilityServices.DEFAULT_LEADER_ID),
+								leaderId),
 							jobManager);
 
 						// wait for re-registration to be completed
 						taskManager.tell(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
 							getTestActor());
 
-						expectMsgEquals(TaskManagerMessages.getRegisteredAtJobManagerMessage());
+						expectMsgClass(TaskManagerMessages.RegisteredAtJobManager.class);
 					}
 				};
 			}};
@@ -135,6 +139,10 @@ public class TaskManagerMetricsTest {
 			if (actorSystem != null) {
 				actorSystem.shutdown();
 			}
+
+			if (highAvailabilityServices != null) {
+				highAvailabilityServices.closeAndCleanupAllData();
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index cecfe6a..973fddf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -34,6 +34,8 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
@@ -44,7 +46,6 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -52,23 +53,24 @@ import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import scala.concurrent.duration.FiniteDuration;
 
 import java.net.InetAddress;
+import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
-public class TaskManagerComponentsStartupShutdownTest {
+public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
 
 	/**
 	 * Makes sure that all components are shut down when the TaskManager
 	 * actor is shut down.
 	 */
 	@Test
-	public void testComponentsStartupShutdown() {
+	public void testComponentsStartupShutdown() throws Exception {
 
 		final String[] TMP_DIR = new String[] { ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH };
 		final Time timeout = Time.seconds(100);
@@ -80,21 +82,28 @@ public class TaskManagerComponentsStartupShutdownTest {
 		config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 1);
 
 		ActorSystem actorSystem = null;
+
+		HighAvailabilityServices highAvailabilityServices = new EmbeddedHaServices(TestingUtils.defaultExecutor());
+
+		ActorRef jobManager = null;
+		ActorRef taskManager = null;
+
 		try {
 			actorSystem = AkkaUtils.createLocalActorSystem(config);
 
-			final ActorRef jobManager = JobManager.startJobManagerActors(
+			jobManager = JobManager.startJobManagerActors(
 				config,
 				actorSystem,
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
+				highAvailabilityServices,
 				JobManager.class,
 				MemoryArchivist.class)._1();
 
 			FlinkResourceManager.startResourceManagerActors(
 				config,
 				actorSystem,
-				LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager),
+				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
 				StandaloneResourceManager.class);
 
 			final int numberOfSlots = 1;
@@ -137,7 +146,8 @@ public class TaskManagerComponentsStartupShutdownTest {
 
 			network.start();
 
-			LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManager.path().toString());
+			LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
+				HighAvailabilityServices.DEFAULT_JOB_ID);
 
 			MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(config);
 
@@ -154,18 +164,20 @@ public class TaskManagerComponentsStartupShutdownTest {
 				leaderRetrievalService,
 				new MetricRegistry(metricRegistryConfiguration));
 
-			final ActorRef taskManager = actorSystem.actorOf(tmProps);
+			taskManager = actorSystem.actorOf(tmProps);
+
+			final ActorRef finalTaskManager = taskManager;
 
 			new JavaTestKit(actorSystem) {{
 
 				// wait for the TaskManager to be registered
-				new Within(new FiniteDuration(5000, TimeUnit.SECONDS)) {
+				new Within(new FiniteDuration(5000L, TimeUnit.SECONDS)) {
 					@Override
 					protected void run() {
-						taskManager.tell(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+						finalTaskManager.tell(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
 								getTestActor());
 
-						expectMsgEquals(TaskManagerMessages.getRegisteredAtJobManagerMessage());
+						expectMsgClass(TaskManagerMessages.RegisteredAtJobManager.class);
 					}
 				};
 			}};
@@ -184,15 +196,16 @@ public class TaskManagerComponentsStartupShutdownTest {
 			assertTrue(network.isShutdown());
 			assertTrue(ioManager.isProperlyShutDown());
 			assertTrue(memManager.isShutdown());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
+			TestingUtils.stopActorsGracefully(Arrays.asList(jobManager, taskManager));
+
 			if (actorSystem != null) {
 				actorSystem.shutdown();
+
+				actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
 			}
+
+			highAvailabilityServices.closeAndCleanupAllData();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
index c0d0455..a760760 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
@@ -24,6 +24,9 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.junit.Test;
 
 import scala.Tuple2;
@@ -44,16 +47,23 @@ import static org.junit.Assert.*;
 public class TaskManagerConfigurationTest {
 
 	@Test
-	public void testUsePreconfiguredNetworkInterface() {
+	public void testUsePreconfiguredNetworkInterface() throws Exception {
+		final String TEST_HOST_NAME = "testhostname";
+
+		Configuration config = new Configuration();
+		config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, TEST_HOST_NAME);
+		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+		config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 7891);
+
+		HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+			config,
+			Executors.directExecutor(),
+			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+
 		try {
-			final String TEST_HOST_NAME = "testhostname";
 
-			Configuration config = new Configuration();
-			config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, TEST_HOST_NAME);
-			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 7891);
 
-			Tuple2<String, Object> address = TaskManager.selectNetworkInterfaceAndPort(config);
+			Tuple2<String, Object> address = TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices);
 
 			// validate the configured test host name
 			assertEquals(TEST_HOST_NAME, address._1());
@@ -61,30 +71,37 @@ public class TaskManagerConfigurationTest {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
+		} finally {
+			highAvailabilityServices.closeAndCleanupAllData();
 		}
 	}
 
 	@Test
-	public void testActorSystemPortConfig() {
-		try {
-			// config with pre-configured hostname to speed up tests (no interface selection)
-			Configuration config = new Configuration();
-			config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
-			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 7891);
+	public void testActorSystemPortConfig() throws Exception {
+		// config with pre-configured hostname to speed up tests (no interface selection)
+		Configuration config = new Configuration();
+		config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
+		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+		config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 7891);
+
+		HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+			config,
+			Executors.directExecutor(),
+			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
 
+		try {
 			// auto port
-			assertEquals(0, TaskManager.selectNetworkInterfaceAndPort(config)._2());
+			assertEquals(0, TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices)._2());
 
 			// pre-defined port
 			final int testPort = 22551;
 			config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, testPort);
-			assertEquals(testPort, TaskManager.selectNetworkInterfaceAndPort(config)._2());
+			assertEquals(testPort, TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices)._2());
 
 			// invalid port
 			try {
 				config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, -1);
-				TaskManager.selectNetworkInterfaceAndPort(config);
+				TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices);
 				fail("should fail with an exception");
 			}
 			catch (IllegalConfigurationException e) {
@@ -94,7 +111,7 @@ public class TaskManagerConfigurationTest {
 			// invalid port
 			try {
 				config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 100000);
-				TaskManager.selectNetworkInterfaceAndPort(config);
+				TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices);
 				fail("should fail with an exception");
 			}
 			catch (IllegalConfigurationException e) {
@@ -104,6 +121,8 @@ public class TaskManagerConfigurationTest {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
+		} finally {
+			highAvailabilityServices.closeAndCleanupAllData();
 		}
 	}
 
@@ -136,7 +155,7 @@ public class TaskManagerConfigurationTest {
 	}
 
 	@Test
-	public void testNetworkInterfaceSelection() {
+	public void testNetworkInterfaceSelection() throws Exception {
 		ServerSocket server;
 		String hostname = "localhost";
 
@@ -149,20 +168,27 @@ public class TaskManagerConfigurationTest {
 			return;
 		}
 
-		try {
-			// open a server port to allow the system to connect
-			Configuration config = new Configuration();
+		// open a server port to allow the system to connect
+		Configuration config = new Configuration();
+
+		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, hostname);
+		config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, server.getLocalPort());
 
-			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, hostname);
-			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, server.getLocalPort());
+		HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+			config,
+			Executors.directExecutor(),
+			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
 
-			assertNotNull(TaskManager.selectNetworkInterfaceAndPort(config)._1());
+		try {
+			assertNotNull(TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices)._1());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
 		finally {
+			highAvailabilityServices.closeAndCleanupAllData();
+
 			try {
 				server.close();
 			} catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index d904004..130610c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -23,17 +23,20 @@ import akka.actor.ActorSystem;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import scala.Some;
 import scala.Tuple2;
@@ -56,7 +59,7 @@ import static org.junit.Assert.fail;
 /**
  * Tests that the TaskManager process properly exits when the TaskManager actor dies.
  */
-public abstract class TaskManagerProcessReapingTestBase {
+public abstract class TaskManagerProcessReapingTestBase extends TestLogger {
 
 	/**
 	 * Called after the task manager has been started up. After calling this
@@ -72,12 +75,25 @@ public abstract class TaskManagerProcessReapingTestBase {
 	}
 
 	@Test
-	public void testReapProcessOnFailure() {
+	public void testReapProcessOnFailure() throws Exception {
 		Process taskManagerProcess = null;
 		ActorSystem jmActorSystem = null;
 
 		final StringWriter processOutput = new StringWriter();
 
+		final Configuration config = new Configuration();
+
+		final String jobManagerHostname = "localhost";
+		final int jobManagerPort = NetUtils.getAvailablePort();
+
+		config.setString(JobManagerOptions.ADDRESS, jobManagerHostname);
+		config.setInteger(JobManagerOptions.PORT, jobManagerPort);
+
+		final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+			config,
+			TestingUtils.defaultExecutor(),
+			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+
 		try {
 			String javaCommand = getJavaCommandPath();
 
@@ -93,29 +109,23 @@ public abstract class TaskManagerProcessReapingTestBase {
 			tempLogFile.deleteOnExit();
 			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
 
-			final int jobManagerPort = NetUtils.getAvailablePort();
-
 			// start a JobManager
-			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
-			jmActorSystem = AkkaUtils.createActorSystem(
-					new Configuration(), new Some<Tuple2<String, Object>>(localAddress));
+			Tuple2<String, Object> localAddress = new Tuple2<String, Object>(jobManagerHostname, jobManagerPort);
+			jmActorSystem = AkkaUtils.createActorSystem(config, new Some<>(localAddress));
 
 			ActorRef jmActor = JobManager.startJobManagerActors(
-				new Configuration(),
+				config,
 				jmActorSystem,
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
+				highAvailabilityServices,
 				JobManager.class,
 				MemoryArchivist.class)._1;
 
-			// start a ResourceManager
-			StandaloneLeaderRetrievalService standaloneLeaderRetrievalService =
-				new StandaloneLeaderRetrievalService(AkkaUtils.getAkkaURL(jmActorSystem, jmActor));
-
 			FlinkResourceManager.startResourceManagerActors(
 				new Configuration(),
 				jmActorSystem,
-				standaloneLeaderRetrievalService,
+				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
 				StandaloneResourceManager.class);
 
 			final int taskManagerPort = NetUtils.getAvailablePort();
@@ -205,6 +215,9 @@ public abstract class TaskManagerProcessReapingTestBase {
 			if (jmActorSystem != null) {
 				jmActorSystem.shutdown();
 			}
+			if (highAvailabilityServices != null) {
+				highAvailabilityServices.closeAndCleanupAllData();
+			}
 		}
 	}
 
@@ -222,18 +235,28 @@ public abstract class TaskManagerProcessReapingTestBase {
 
 	public static class TaskManagerTestEntryPoint {
 
-		public static void main(String[] args) {
-			try {
-				int jobManagerPort = Integer.parseInt(args[0]);
-				int taskManagerPort = Integer.parseInt(args[1]);
+		public static void main(String[] args) throws Exception {
+			int jobManagerPort = Integer.parseInt(args[0]);
+			int taskManagerPort = Integer.parseInt(args[1]);
 
-				Configuration cfg = new Configuration();
-				cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-				cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
-				cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
-				cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 256);
+			Configuration cfg = new Configuration();
+			cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+			cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+			cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+			cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 256);
 
-				TaskManager.runTaskManager("localhost", ResourceID.generate(), taskManagerPort, cfg);
+			final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+				cfg,
+				TestingUtils.defaultExecutor(),
+				HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+
+			try {
+				TaskManager.runTaskManager(
+					"localhost",
+					ResourceID.generate(),
+					taskManagerPort,
+					cfg,
+					highAvailabilityServices);
 
 				// wait forever
 				Object lock = new Object();
@@ -243,6 +266,8 @@ public abstract class TaskManagerProcessReapingTestBase {
 			}
 			catch (Throwable t) {
 				System.exit(1);
+			} finally {
+				highAvailabilityServices.closeAndCleanupAllData();
 			}
 		}
 	}