You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:12 UTC

[07/16] flink git commit: [FLINK-6078] Remove CuratorFramework#close calls from ZooKeeper based HA services

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 5f35229..92de31a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -28,12 +28,14 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
@@ -43,22 +45,29 @@ import org.apache.flink.runtime.messages.RegistrationMessages.RefuseRegistration
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Matchers;
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.util.Arrays;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.testingUtils.TestingUtils.stopActor;
 import static org.apache.flink.runtime.testingUtils.TestingUtils.createTaskManager;
-import static org.apache.flink.runtime.testingUtils.TestingUtils.createJobManager;
+import static org.apache.flink.runtime.testingUtils.TestingUtils.stopActorGatewaysGracefully;
+import static org.apache.flink.runtime.testingUtils.TestingUtils.stopActorGracefully;
 import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * The tests in this class verify the behavior of the TaskManager
@@ -67,8 +76,6 @@ import static org.junit.Assert.*;
  */
 public class TaskManagerRegistrationTest extends TestLogger {
 
-	private static final Option<String> NONE_STRING = Option.empty();
-
 	// use one actor system throughout all tests
 	private static ActorSystem actorSystem;
 
@@ -76,6 +83,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 	private static FiniteDuration timeout = new FiniteDuration(20, TimeUnit.SECONDS);
 
+	private TestingHighAvailabilityServices highAvailabilityServices;
+
 	@BeforeClass
 	public static void startActorSystem() {
 		config = new Configuration();
@@ -94,38 +103,58 @@ public class TaskManagerRegistrationTest extends TestLogger {
 		}
 	}
 
+	@Before
+	public void setupTest() {
+		highAvailabilityServices = new TestingHighAvailabilityServices();
+	}
+
+	@After
+	public void tearDownTest() throws Exception {
+		highAvailabilityServices.closeAndCleanupAllData();
+		highAvailabilityServices = null;
+	}
+
 	/**
 	 * A test that verifies that two TaskManagers correctly register at the
 	 * JobManager.
 	 */
 	@Test
-	public void testSimpleRegistration() {
+	public void testSimpleRegistration() throws Exception {
 		new JavaTestKit(actorSystem) {{
 
 			ActorGateway jobManager = null;
 			ActorGateway taskManager1 = null;
 			ActorGateway taskManager2 = null;
+			ActorGateway resourceManager = null;
+
+			EmbeddedHaServices embeddedHaServices = null;
 
 			try {
+				embeddedHaServices = new EmbeddedHaServices(Executors.directExecutor());
+
 				// a simple JobManager
-				jobManager = createJobManager(
+				jobManager = TestingUtils.createJobManager(
 					actorSystem,
 					TestingUtils.defaultExecutor(),
 					TestingUtils.defaultExecutor(),
-					config);
-				startResourceManager(config, jobManager.actor());
+					config,
+					embeddedHaServices);
+
+				resourceManager = new AkkaActorGateway(
+					startResourceManager(config, embeddedHaServices),
+					jobManager.leaderSessionID());
 
 				// start two TaskManagers. it will automatically try to register
 				taskManager1 = createTaskManager(
 						actorSystem,
-						jobManager,
+						embeddedHaServices,
 						config,
 						true,
 						false);
 
 				taskManager2 = createTaskManager(
 						actorSystem,
-						jobManager,
+						embeddedHaServices,
 						config,
 						true,
 						false);
@@ -142,10 +171,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				Object response1 = Await.result(responseFuture1, timeout);
 				Object response2 = Await.result(responseFuture2, timeout);
 
-				// this is a hack to work around the way Java can interact with scala case objects
-				Class<?> confirmClass = TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
-				assertTrue(response1 != null && confirmClass.isAssignableFrom(response1.getClass()));
-				assertTrue(response2 != null && confirmClass.isAssignableFrom(response2.getClass()));
+				assertTrue(response1 instanceof TaskManagerMessages.RegisteredAtJobManager);
+				assertTrue(response2 instanceof TaskManagerMessages.RegisteredAtJobManager);
 
 				// check that the JobManager has 2 TaskManagers registered
 				Future<Object> numTaskManagersFuture = jobManager.ask(
@@ -159,9 +186,9 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				e.printStackTrace();
 				fail(e.getMessage());
 			} finally {
-				stopActor(taskManager1);
-				stopActor(taskManager2);
-				stopActor(jobManager);
+				stopActorGatewaysGracefully(Arrays.asList(taskManager1, taskManager2, jobManager, resourceManager));
+
+				embeddedHaServices.closeAndCleanupAllData();
 			}
 		}};
 	}
@@ -171,36 +198,35 @@ public class TaskManagerRegistrationTest extends TestLogger {
 	 * JobManager.
 	 */
 	@Test
-	public void testDelayedRegistration() {
+	public void testDelayedRegistration() throws Exception {
 		new JavaTestKit(actorSystem) {{
 			ActorGateway jobManager = null;
 			ActorGateway taskManager = null;
 
-			FiniteDuration delayedTimeout = timeout.$times(3);
+			FiniteDuration delayedTimeout = timeout.$times(3L);
+
+			final EmbeddedHaServices embeddedHaServices = new EmbeddedHaServices(Executors.directExecutor());
 
 			try {
 				// start a TaskManager that tries to register at the JobManager before the JobManager is
 				// available. we give it the regular JobManager akka URL
 				taskManager = createTaskManager(
 						actorSystem,
-						AkkaUtils.getLocalAkkaURL(JobMaster.JOB_MANAGER_NAME),
+						embeddedHaServices,
 						new Configuration(),
 						true,
 						false);
 
 				// let it try for a bit
-				Thread.sleep(6000);
+				Thread.sleep(6000L);
 
 				// now start the JobManager, with the regular akka URL
-				jobManager = createJobManager(
+				jobManager = TestingUtils.createJobManager(
 					actorSystem,
 					TestingUtils.defaultExecutor(),
 					TestingUtils.defaultExecutor(),
-					new Configuration());
-
-				startResourceManager(config, jobManager.actor());
-
-				startResourceManager(config, jobManager.actor());
+					new Configuration(),
+					embeddedHaServices);
 
 				// check that the TaskManagers are registered
 				Future<Object> responseFuture = taskManager.ask(
@@ -209,17 +235,11 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 				Object response = Await.result(responseFuture, delayedTimeout);
 
-				// this is a hack to work around the way Java can interact with scala case objects
-				Class<?> confirmClass = TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
-				assertTrue(response != null && confirmClass.isAssignableFrom(response.getClass()));
-
-			}
-			catch (Exception e) {
-				e.printStackTrace();
-				fail(e.getMessage());
+				assertTrue(response instanceof TaskManagerMessages.RegisteredAtJobManager);
 			} finally {
-				stopActor(taskManager);
-				stopActor(jobManager);
+				stopActorGatewaysGracefully(Arrays.asList(taskManager, jobManager));
+
+				embeddedHaServices.closeAndCleanupAllData();
 			}
 		}};
 	}
@@ -246,10 +266,17 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				Configuration tmConfig = new Configuration();
 				tmConfig.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, "500 ms");
 
+				highAvailabilityServices.setJobMasterLeaderRetriever(
+					HighAvailabilityServices.DEFAULT_JOB_ID,
+					// Give a non-existent job manager address to the task manager
+					new TestingLeaderRetrievalService(
+						"foobar",
+						HighAvailabilityServices.DEFAULT_LEADER_ID));
+
 				// start the taskManager actor
 				taskManager = createTaskManager(
 						actorSystem,
-						AkkaUtils.getLocalAkkaURL(JobMaster.JOB_MANAGER_NAME),
+						highAvailabilityServices,
 						tmConfig,
 						true,
 						false);
@@ -271,7 +298,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				e.printStackTrace();
 				fail(e.getMessage());
 			} finally {
-				stopActor(taskManager);
+				stopActorGracefully(taskManager);
 			}
 		}};
 	}
@@ -286,18 +313,28 @@ public class TaskManagerRegistrationTest extends TestLogger {
 			ActorGateway jm = null;
 			ActorGateway taskManager =null;
 			try {
-				jm = TestingUtils.createForwardingActor(actorSystem, getTestActor(), Option.<String>empty());
+				jm = TestingUtils.createForwardingActor(
+					actorSystem,
+					getTestActor(),
+					HighAvailabilityServices.DEFAULT_LEADER_ID,
+					Option.<String>empty());
 				final ActorGateway jmGateway = jm;
 
 				FiniteDuration refusedRegistrationPause = new FiniteDuration(500, TimeUnit.MILLISECONDS);
 				Configuration tmConfig = new Configuration(config);
 				tmConfig.setString(ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, refusedRegistrationPause.toString());
 
+				highAvailabilityServices.setJobMasterLeaderRetriever(
+					HighAvailabilityServices.DEFAULT_JOB_ID,
+					new TestingLeaderRetrievalService(
+						jm.path(),
+						HighAvailabilityServices.DEFAULT_LEADER_ID));
+
 				// we make the test actor (the test kit) the JobManager to intercept
 				// the messages
 				taskManager = createTaskManager(
 					actorSystem,
-					jmGateway,
+					highAvailabilityServices,
 					tmConfig,
 					true,
 					false);
@@ -330,13 +367,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
 						expectMsgClass(RegisterTaskManager.class);
 					}
 				};
-			}
-			catch (Throwable e) {
-				e.printStackTrace();
-				fail(e.getMessage());
 			} finally {
-				stopActor(taskManager);
-				stopActor(jm);
+				stopActorGatewaysGracefully(Arrays.asList(taskManager, jm));
 			}
 		}};
 	}
@@ -353,7 +385,18 @@ public class TaskManagerRegistrationTest extends TestLogger {
 			try {
 				FiniteDuration timeout = new FiniteDuration(5, TimeUnit.SECONDS);
 
-				jm = TestingUtils.createForwardingActor(actorSystem, getTestActor(), Option.<String>empty());
+				jm = TestingUtils.createForwardingActor(
+					actorSystem,
+					getTestActor(),
+					HighAvailabilityServices.DEFAULT_LEADER_ID,
+					Option.<String>empty());
+
+				highAvailabilityServices.setJobMasterLeaderRetriever(
+					HighAvailabilityServices.DEFAULT_JOB_ID,
+					new TestingLeaderRetrievalService(
+						jm.path(),
+						HighAvailabilityServices.DEFAULT_LEADER_ID));
+
 				final ActorGateway jmGateway = jm;
 
 				long refusedRegistrationPause = 500;
@@ -368,7 +411,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				// the messages
 				taskManager = createTaskManager(
 					actorSystem,
-					jmGateway,
+					highAvailabilityServices,
 					tmConfig,
 					true,
 					false);
@@ -423,8 +466,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 					+ maxExpectedNumberOfRegisterTaskManagerMessages,
 					registerTaskManagerMessages.length <= maxExpectedNumberOfRegisterTaskManagerMessages);
 			} finally {
-				stopActor(taskManager);
-				stopActor(jm);
+				stopActorGatewaysGracefully(Arrays.asList(taskManager, jm));
 			}
 		}};
 	}
@@ -444,16 +486,25 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 			try {
 				fakeJobManager1Gateway = TestingUtils.createForwardingActor(
-						actorSystem,
-						getTestActor(),
-						Option.apply(JOB_MANAGER_NAME));
+					actorSystem,
+					getTestActor(),
+					HighAvailabilityServices.DEFAULT_LEADER_ID,
+					Option.apply(JOB_MANAGER_NAME));
 				final ActorGateway fakeJM1Gateway = fakeJobManager1Gateway;
 
+				TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(
+					fakeJM1Gateway.path(),
+					HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+				highAvailabilityServices.setJobMasterLeaderRetriever(
+					HighAvailabilityServices.DEFAULT_JOB_ID,
+					testingLeaderRetrievalService);
+
 				// we make the test actor (the test kit) the JobManager to intercept
 				// the messages
 				taskManagerGateway = createTaskManager(
 						actorSystem,
-						fakeJobManager1Gateway,
+						highAvailabilityServices,
 						config,
 						true,
 						false);
@@ -512,9 +563,10 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				do {
 					try {
 						fakeJobManager2Gateway = TestingUtils.createForwardingActor(
-								actorSystem,
-								getTestActor(),
-								Option.apply(JOB_MANAGER_NAME));
+							actorSystem,
+							getTestActor(),
+							HighAvailabilityServices.DEFAULT_LEADER_ID,
+							Option.apply(JOB_MANAGER_NAME));
 					} catch (InvalidActorNameException e) {
 						// wait and retry
 						Thread.sleep(100);
@@ -543,9 +595,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				e.printStackTrace();
 				fail(e.getMessage());
 			} finally {
-				stopActor(taskManagerGateway);
-				stopActor(fakeJobManager1Gateway);
-				stopActor(fakeJobManager2Gateway);
+				stopActorGatewaysGracefully(Arrays.asList(taskManagerGateway, fakeJobManager2Gateway));
 			}
 		}};
 	}
@@ -556,21 +606,25 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 			ActorGateway taskManagerGateway = null;
 
+			final UUID falseLeaderSessionID = UUID.randomUUID();
+			final UUID trueLeaderSessionID = UUID.randomUUID();
+
+			HighAvailabilityServices mockedHighAvailabilityServices = mock(HighAvailabilityServices.class);
+			when(mockedHighAvailabilityServices.getJobManagerLeaderRetriever(Matchers.eq(HighAvailabilityServices.DEFAULT_JOB_ID)))
+				.thenReturn(new StandaloneLeaderRetrievalService(getTestActor().path().toString(), trueLeaderSessionID));
+
 			try {
 				// we make the test actor (the test kit) the JobManager to intercept
 				// the messages
 				taskManagerGateway = createTaskManager(
 						actorSystem,
-						getTestActor(),
+						mockedHighAvailabilityServices,
 						config,
 						true,
 						false);
 
 				final ActorRef taskManager = taskManagerGateway.actor();
 
-				final UUID falseLeaderSessionID = UUID.randomUUID();
-				final UUID trueLeaderSessionID = HighAvailabilityServices.DEFAULT_LEADER_ID;
-
 				new Within(timeout) {
 
 					@Override
@@ -581,7 +635,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 						LeaderSessionMessage lsm = expectMsgClass(LeaderSessionMessage.class);
 
-						assertTrue(lsm.leaderSessionID() == trueLeaderSessionID);
+						assertTrue(lsm.leaderSessionID().equals(trueLeaderSessionID));
 						assertTrue(lsm.message() instanceof RegisterTaskManager);
 
 						final ActorRef tm = getLastSender();
@@ -606,9 +660,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
 								getTestActor());
 
 						Object message = null;
-						Object confirmMessageClass = TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
 
-						while(message == null || !(message.getClass().equals(confirmMessageClass))) {
+						while(!(message instanceof TaskManagerMessages.RegisteredAtJobManager)) {
 							message = receiveOne(TestingUtils.TESTING_DURATION());
 						}
 
@@ -617,12 +670,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
 						expectMsgEquals(new JobManagerMessages.ResponseLeaderSessionID(trueLeaderSessionID));
 					}
 				};
-			}
-			catch (Throwable e) {
-				e.printStackTrace();
-				fail(e.getMessage());
 			} finally {
-				stopActor(taskManagerGateway);
+				stopActorGracefully(taskManagerGateway);
 			}
 		}};
 	}
@@ -631,25 +680,11 @@ public class TaskManagerRegistrationTest extends TestLogger {
 	//  Utility Functions
 	// --------------------------------------------------------------------------------------------
 
-	private static ActorRef startJobManager(Configuration configuration) throws Exception {
-		// start the actors. don't give names, so they get generated names and we
-		// avoid conflicts with the actor names
-		return JobManager.startJobManagerActors(
-			configuration,
-			actorSystem,
-			TestingUtils.defaultExecutor(),
-			TestingUtils.defaultExecutor(),
-			NONE_STRING,
-			NONE_STRING,
-			JobManager.class,
-			MemoryArchivist.class)._1();
-	}
-
-	private static ActorRef startResourceManager(Configuration config, ActorRef jobManager) {
+	private static ActorRef startResourceManager(Configuration config, HighAvailabilityServices highAvailabilityServices) {
 		return FlinkResourceManager.startResourceManagerActors(
 			config,
 			actorSystem,
-			new StandaloneLeaderRetrievalService(jobManager.path().toString()),
+			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
 			StandaloneResourceManager.class);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index 4df8db3..0e77ddd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -26,9 +26,13 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.StartupUtils;
 import org.apache.flink.util.NetUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import scala.Option;
 
@@ -46,6 +50,21 @@ import java.util.UUID;
  * problems.
  */
 public class TaskManagerStartupTest {
+
+	private HighAvailabilityServices highAvailabilityServices;
+
+	@Before
+	public void setupTest() {
+		highAvailabilityServices = new EmbeddedHaServices(TestingUtils.defaultExecutor());
+	}
+
+	@After
+	public void tearDownTest() throws Exception {
+		if (highAvailabilityServices != null) {
+			highAvailabilityServices.closeAndCleanupAllData();
+			highAvailabilityServices = null;
+		}
+	}
 	
 
 	/**
@@ -55,8 +74,9 @@ public class TaskManagerStartupTest {
 	 * @throws Throwable
 	 */
 	@Test(expected = BindException.class)
-	public void testStartupWhenTaskmanagerActorPortIsUsed() throws BindException {
+	public void testStartupWhenTaskmanagerActorPortIsUsed() throws Exception {
 		ServerSocket blocker = null;
+
 		try {
 			final String localHostName = "localhost";
 			final InetAddress localBindAddress = InetAddress.getByName(NetUtils.getWildcardIPAddress());
@@ -65,8 +85,13 @@ public class TaskManagerStartupTest {
 			blocker = new ServerSocket(0, 50, localBindAddress);
 			final int port = blocker.getLocalPort();
 
-			TaskManager.runTaskManager(localHostName, ResourceID.generate(), port, new Configuration(),
-					TaskManager.class);
+			TaskManager.runTaskManager(
+				localHostName,
+				ResourceID.generate(),
+				port,
+				new Configuration(),
+				highAvailabilityServices,
+				TaskManager.class);
 			fail("This should fail with an IOException");
 
 		}
@@ -92,6 +117,8 @@ public class TaskManagerStartupTest {
 					// no need to log here
 				}
 			}
+
+			highAvailabilityServices.closeAndCleanupAllData();
 		}
 	}
 
@@ -102,7 +129,7 @@ public class TaskManagerStartupTest {
 	 * directories are not writable.
 	 */
 	@Test
-	public void testIODirectoryNotWritable() {
+	public void testIODirectoryNotWritable() throws Exception {
 		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
 		File nonWritable = new File(tempDir, UUID.randomUUID().toString());
 
@@ -119,7 +146,12 @@ public class TaskManagerStartupTest {
 			cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656);
 
 			try {
-				TaskManager.runTaskManager("localhost", ResourceID.generate(), 0, cfg);
+				TaskManager.runTaskManager(
+					"localhost",
+					ResourceID.generate(),
+					0,
+					cfg,
+					highAvailabilityServices);
 				fail("Should fail synchronously with an exception");
 			}
 			catch (IOException e) {
@@ -139,6 +171,8 @@ public class TaskManagerStartupTest {
 			catch (IOException e) {
 				// best effort
 			}
+
+			highAvailabilityServices.closeAndCleanupAllData();
 		}
 	}
 
@@ -157,7 +191,12 @@ public class TaskManagerStartupTest {
 			// something invalid
 			cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -42L);
 			try {
-				TaskManager.runTaskManager("localhost", ResourceID.generate(), 0, cfg);
+				TaskManager.runTaskManager(
+					"localhost",
+					ResourceID.generate(),
+					0,
+					cfg,
+					highAvailabilityServices);
 				fail("Should fail synchronously with an exception");
 			}
 			catch (IllegalConfigurationException e) {
@@ -169,7 +208,12 @@ public class TaskManagerStartupTest {
 					TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()) >> 20;
 			cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memSize);
 			try {
-				TaskManager.runTaskManager("localhost", ResourceID.generate(), 0, cfg);
+				TaskManager.runTaskManager(
+					"localhost",
+					ResourceID.generate(),
+					0,
+					cfg,
+					highAvailabilityServices);
 				fail("Should fail synchronously with an exception");
 			}
 			catch (Exception e) {
@@ -204,9 +248,9 @@ public class TaskManagerStartupTest {
 				cfg,
 				ResourceID.generate(),
 				null,
+				highAvailabilityServices,
 				"localhost",
 				Option.<String>empty(),
-				Option.<LeaderRetrievalService>empty(),
 				false,
 				TaskManager.class);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 4530ade..0f5afc0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
@@ -60,6 +61,8 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.RegistrationMessages;
 import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
@@ -80,8 +83,10 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -126,7 +131,9 @@ public class TaskManagerTest extends TestLogger {
 
 	private static ActorSystem system;
 
-	final static UUID leaderSessionID = HighAvailabilityServices.DEFAULT_LEADER_ID;
+	final static UUID leaderSessionID = UUID.randomUUID();
+
+	private TestingHighAvailabilityServices highAvailabilityServices;
 
 	@BeforeClass
 	public static void setup() {
@@ -138,20 +145,39 @@ public class TaskManagerTest extends TestLogger {
 		JavaTestKit.shutdownActorSystem(system);
 	}
 
+	@Before
+	public void setupTest() {
+		highAvailabilityServices = new TestingHighAvailabilityServices();
+	}
+
+	@After
+	public void tearDownTest() throws Exception {
+		if (highAvailabilityServices != null) {
+			highAvailabilityServices.closeAndCleanupAllData();
+
+			highAvailabilityServices = null;
+		}
+	}
+
 	@Test
 	public void testSubmitAndExecuteTask() throws IOException {
 		new JavaTestKit(system){{
 
 			ActorGateway taskManager = null;
 			final ActorGateway jobManager = TestingUtils.createForwardingActor(
-					system,
-					getTestActor(),
-					Option.<String>empty());
+				system,
+				getTestActor(),
+				HighAvailabilityServices.DEFAULT_LEADER_ID,
+				Option.<String>empty());
+
+			highAvailabilityServices.setJobMasterLeaderRetriever(
+				HighAvailabilityServices.DEFAULT_JOB_ID,
+				new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
 
 			try {
 				taskManager = TestingUtils.createTaskManager(
 						system,
-						jobManager,
+						highAvailabilityServices,
 						new Configuration(),
 						true,
 						false);
@@ -261,9 +287,13 @@ public class TaskManagerTest extends TestLogger {
 				ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
 				jobManager = new AkkaActorGateway(jm, leaderSessionID);
 
+				highAvailabilityServices.setJobMasterLeaderRetriever(
+					HighAvailabilityServices.DEFAULT_JOB_ID,
+					new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
 				taskManager = TestingUtils.createTaskManager(
 						system,
-						jobManager,
+						highAvailabilityServices,
 						new Configuration(),
 						true,
 						true);
@@ -398,9 +428,13 @@ public class TaskManagerTest extends TestLogger {
 				ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
 				jobManager = new AkkaActorGateway(jm, leaderSessionID);
 
+				highAvailabilityServices.setJobMasterLeaderRetriever(
+					HighAvailabilityServices.DEFAULT_JOB_ID,
+					new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
 				taskManager = TestingUtils.createTaskManager(
 						system,
-						jobManager,
+						highAvailabilityServices,
 						new Configuration(),
 						true,
 						true);
@@ -524,9 +558,13 @@ public class TaskManagerTest extends TestLogger {
 				ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
 				jobManager = new AkkaActorGateway(jm, leaderSessionID);
 
+				highAvailabilityServices.setJobMasterLeaderRetriever(
+					HighAvailabilityServices.DEFAULT_JOB_ID,
+					new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
 				taskManager = TestingUtils.createTaskManager(
 						system,
-						jobManager,
+						highAvailabilityServices,
 						new Configuration(),
 						true,
 						true);
@@ -624,9 +662,13 @@ public class TaskManagerTest extends TestLogger {
 				ActorRef jm = system.actorOf(Props.create(new SimpleLookupJobManagerCreator(leaderSessionID)));
 				jobManager = new AkkaActorGateway(jm, leaderSessionID);
 
+				highAvailabilityServices.setJobMasterLeaderRetriever(
+					HighAvailabilityServices.DEFAULT_JOB_ID,
+					new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
 				taskManager = TestingUtils.createTaskManager(
 						system,
-						jobManager,
+						highAvailabilityServices,
 						new Configuration(),
 						true,
 						true);
@@ -769,9 +811,13 @@ public class TaskManagerTest extends TestLogger {
 
 				jobManager = new AkkaActorGateway(jm, leaderSessionID);
 
+				highAvailabilityServices.setJobMasterLeaderRetriever(
+					HighAvailabilityServices.DEFAULT_JOB_ID,
+					new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
 				taskManager = TestingUtils.createTaskManager(
 						system,
-						jobManager,
+						highAvailabilityServices,
 						new Configuration(),
 						true,
 						true);
@@ -906,6 +952,10 @@ public class TaskManagerTest extends TestLogger {
 
 				jobManager = new AkkaActorGateway(jm, leaderSessionID);
 
+				highAvailabilityServices.setJobMasterLeaderRetriever(
+					HighAvailabilityServices.DEFAULT_JOB_ID,
+					new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
 				final int dataPort = NetUtils.getAvailablePort();
 				Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
@@ -914,7 +964,7 @@ public class TaskManagerTest extends TestLogger {
 
 				taskManager = TestingUtils.createTaskManager(
 						system,
-						jobManager,
+						highAvailabilityServices,
 						config,
 						false,
 						true);
@@ -1024,13 +1074,17 @@ public class TaskManagerTest extends TestLogger {
 
 				jobManager = new AkkaActorGateway(jm, leaderSessionID);
 
+				highAvailabilityServices.setJobMasterLeaderRetriever(
+					HighAvailabilityServices.DEFAULT_JOB_ID,
+					new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
 				final Configuration config = new Configuration();
 				config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
 				config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
 
 				taskManager = TestingUtils.createTaskManager(
 						system,
-						jobManager,
+						highAvailabilityServices,
 						config,
 						true,
 						true);
@@ -1122,9 +1176,13 @@ public class TaskManagerTest extends TestLogger {
 				config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
 				config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
 
+				highAvailabilityServices.setJobMasterLeaderRetriever(
+					HighAvailabilityServices.DEFAULT_JOB_ID,
+					new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
 				taskManager = TestingUtils.createTaskManager(
 					system,
-					jobManager,
+					highAvailabilityServices,
 					config,
 					false,
 					true);
@@ -1166,18 +1224,26 @@ public class TaskManagerTest extends TestLogger {
 
 			// We need this to be a JM that answers to update messages for
 			// robustness on Travis (if jobs need to be resubmitted in (4)).
-			ActorRef jm = system.actorOf(Props.create(new SimpleLookupJobManagerCreator(leaderSessionID)));
-			ActorGateway jobManagerActorGateway = new AkkaActorGateway(jm, leaderSessionID);
+			ActorRef jm = system.actorOf(Props.create(new SimpleLookupJobManagerCreator(
+				HighAvailabilityServices.DEFAULT_LEADER_ID)));
+			ActorGateway jobManagerActorGateway = new AkkaActorGateway(
+				jm,
+				HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 			final ActorGateway testActorGateway = new AkkaActorGateway(
 					getTestActor(),
-					leaderSessionID);
+					HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 			try {
 				final ActorGateway jobManager = jobManagerActorGateway;
+
+				highAvailabilityServices.setJobMasterLeaderRetriever(
+					HighAvailabilityServices.DEFAULT_JOB_ID,
+					new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
 				final ActorGateway taskManager = TestingUtils.createTaskManager(
 						system,
-						jobManager,
+						highAvailabilityServices,
 						new Configuration(),
 						true,
 						false);
@@ -1459,11 +1525,15 @@ public class TaskManagerTest extends TestLogger {
 
 	@Test
 	public void testTerminationOnFatalError() {
+		highAvailabilityServices.setJobMasterLeaderRetriever(
+			HighAvailabilityServices.DEFAULT_JOB_ID,
+			new TestingLeaderRetrievalService());
+
 		new JavaTestKit(system){{
 
 			final ActorGateway taskManager = TestingUtils.createTaskManager(
 					system,
-					system.deadLetters(), // no jobmanager
+					highAvailabilityServices, // no jobmanager
 					new Configuration(),
 					true,
 					false);
@@ -1524,9 +1594,13 @@ public class TaskManagerTest extends TestLogger {
 			ActorRef jmActorRef = system.actorOf(Props.create(FailingScheduleOrUpdateConsumersJobManager.class, leaderSessionID), "jobmanager");
 			ActorGateway jobManager = new AkkaActorGateway(jmActorRef, leaderSessionID);
 
+			highAvailabilityServices.setJobMasterLeaderRetriever(
+				HighAvailabilityServices.DEFAULT_JOB_ID,
+				new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
 			final ActorGateway taskManager = TestingUtils.createTaskManager(
 				system,
-				jobManager,
+				highAvailabilityServices,
 				configuration,
 				true,
 				true);
@@ -1562,9 +1636,13 @@ public class TaskManagerTest extends TestLogger {
 			ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
 			jobManager = new AkkaActorGateway(jm, leaderSessionID);
 
+			highAvailabilityServices.setJobMasterLeaderRetriever(
+				HighAvailabilityServices.DEFAULT_JOB_ID,
+				new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
 			taskManager = TestingUtils.createTaskManager(
 				system,
-				jobManager,
+				highAvailabilityServices,
 				new Configuration(),
 				true,
 				true);
@@ -1619,9 +1697,13 @@ public class TaskManagerTest extends TestLogger {
 			ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
 			jobManager = new AkkaActorGateway(jm, leaderSessionID);
 
+			highAvailabilityServices.setJobMasterLeaderRetriever(
+				HighAvailabilityServices.DEFAULT_JOB_ID,
+				new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
 			taskManager = TestingUtils.createTaskManager(
 				system,
-				jobManager,
+				highAvailabilityServices,
 				new Configuration(),
 				true,
 				true);
@@ -1679,9 +1761,13 @@ public class TaskManagerTest extends TestLogger {
 			ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
 			jobManager = new AkkaActorGateway(jm, leaderSessionID);
 
+			highAvailabilityServices.setJobMasterLeaderRetriever(
+				HighAvailabilityServices.DEFAULT_JOB_ID,
+				new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
 			taskManager = TestingUtils.createTaskManager(
 				system,
-				jobManager,
+				highAvailabilityServices,
 				new Configuration(),
 				true,
 				true);
@@ -1724,9 +1810,13 @@ public class TaskManagerTest extends TestLogger {
 			ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
 			jobManager = new AkkaActorGateway(jm, leaderSessionID);
 
+			highAvailabilityServices.setJobMasterLeaderRetriever(
+				HighAvailabilityServices.DEFAULT_JOB_ID,
+				new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
 			taskManager = TestingUtils.createTaskManager(
 				system,
-				jobManager,
+				highAvailabilityServices,
 				new Configuration(),
 				true,
 				true);

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index b2e8005..4be3299 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.jobmanager
 
 import java.net.InetAddress
-import java.util.concurrent.{Executors, ScheduledExecutorService}
+import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
 
 import akka.actor._
 import akka.testkit.{ImplicitSender, TestKit, TestProbe}
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices
 import org.apache.flink.runtime.instance._
 import org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwardingActor
 import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
@@ -40,7 +41,7 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils
 import org.junit.Assert.{assertNotEquals, assertNotNull}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers, WordSpecLike}
 
 import scala.concurrent.Await
 import scala.concurrent.duration._
@@ -51,31 +52,45 @@ import scala.language.postfixOps
  */
 @RunWith(classOf[JUnitRunner])
 class JobManagerRegistrationTest(_system: ActorSystem) extends TestKit(_system) with
-ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
+ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with BeforeAndAfterEach {
 
   def this() = this(AkkaUtils.createLocalActorSystem(new Configuration()))
 
   val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(2)
+
+  var highAvailabilityServices: HighAvailabilityServices = _
+
+  val timeout = FiniteDuration(30, TimeUnit.SECONDS)
   
   override def afterAll(): Unit = {
     executor.shutdownNow()
     TestKit.shutdownActorSystem(system)
   }
 
+  override def beforeEach(): Unit = {
+    highAvailabilityServices = new EmbeddedHaServices(executor)
+  }
+
+  override def afterEach(): Unit = {
+    if (highAvailabilityServices != null) {
+      highAvailabilityServices.closeAndCleanupAllData()
+    }
+  }
+
   "The JobManager" should {
 
     "assign a TaskManager a unique instance ID" in {
 
       var jmOption: Option[ActorGateway] = None
-      var rmOption: Option[ActorGateway] = None
+      var rmOption: Option[ActorRef] = None
       var tm1Option: Option[ActorRef] = None
       var tm2Option: Option[ActorRef] = None
 
       try {
-        val jm = startTestingJobManager(_system)
+        val jm = startTestingJobManager(_system, highAvailabilityServices)
         jmOption = Some(jm)
 
-        val rm = startTestingResourceManager(_system, jm.actor())
+        val rm = startTestingResourceManager(_system, highAvailabilityServices)
         rmOption = Some(rm)
 
         val probe = TestProbe()
@@ -149,19 +164,19 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
     "handle repeated registration calls" in {
 
       var jmOption: Option[ActorGateway] = None
-      var rmOption: Option[ActorGateway] = None
+      var rmOption: Option[ActorRef] = None
 
       try {
         val probe = TestProbe()
 
-        val jm = startTestingJobManager(_system)
+        val jm = startTestingJobManager(_system, highAvailabilityServices)
         jmOption = Some(jm)
-        val rm = startTestingResourceManager(_system, jm.actor())
+        val rm = startTestingResourceManager(_system, highAvailabilityServices)
         rmOption = Some(rm)
 
-        val selfGateway = new AkkaActorGateway(
-          probe.ref,
-          HighAvailabilityServices.DEFAULT_LEADER_ID)
+        val leaderId = jm.leaderSessionID()
+
+        val selfGateway = new AkkaActorGateway(probe.ref, leaderId)
 
         val resourceID = ResourceID.generate()
         val connectionInfo = new TaskManagerLocation(resourceID, InetAddress.getLocalHost, 1)
@@ -199,22 +214,22 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
 
           probe.expectMsgType[LeaderSessionMessage] match {
             case LeaderSessionMessage(
-            HighAvailabilityServices.DEFAULT_LEADER_ID,
-            AcknowledgeRegistration(_, _)) =>
+              `leaderId`,
+              AcknowledgeRegistration(_, _)) =>
             case m => fail("Wrong message type: " + m)
           }
 
           probe.expectMsgType[LeaderSessionMessage] match {
             case LeaderSessionMessage(
-            HighAvailabilityServices.DEFAULT_LEADER_ID,
-            AlreadyRegistered(_, _)) =>
+              `leaderId`,
+              AlreadyRegistered(_, _)) =>
             case m => fail("Wrong message type: " + m)
           }
 
           probe.expectMsgType[LeaderSessionMessage] match {
             case LeaderSessionMessage(
-            HighAvailabilityServices.DEFAULT_LEADER_ID,
-            AlreadyRegistered(_, _)) =>
+              `leaderId`,
+              AlreadyRegistered(_, _)) =>
             case m => fail("Wrong message type: " + m)
           }
         }
@@ -225,14 +240,16 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
     }
   }
 
-  private def startTestingJobManager(system: ActorSystem): ActorGateway = {
-    val config = new Configuration()
+  private def startTestingJobManager(
+      system: ActorSystem,
+      highAvailabilityServices: HighAvailabilityServices): ActorGateway = {
 
+    val config = new Configuration()
+    
     val components = JobManager.createJobManagerComponents(
       config,
       executor,
-      executor,
-      None)
+      executor)
 
     // Start the JobManager without a MetricRegistry so that we don't start the MetricQueryService.
     // The problem of the MetricQueryService is that it starts an actor with a fixed name. Thus,
@@ -249,25 +266,34 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
       ActorRef.noSender,
       components._4,
       components._5,
+      highAvailabilityServices.getJobManagerLeaderElectionService(
+        HighAvailabilityServices.DEFAULT_JOB_ID),
+      highAvailabilityServices.getSubmittedJobGraphStore(),
+      highAvailabilityServices.getCheckpointRecoveryFactory(),
       components._8,
-      components._9,
-      components._10,
-      components._11,
       None)
 
-    val jm = _system.actorOf(props)
+    _system.actorOf(props)
 
-    new AkkaActorGateway(jm, HighAvailabilityServices.DEFAULT_LEADER_ID)
+    LeaderRetrievalUtils.retrieveLeaderGateway(
+      highAvailabilityServices.getJobManagerLeaderRetriever(
+        HighAvailabilityServices.DEFAULT_JOB_ID),
+      system,
+      timeout)
   }
 
-  private def startTestingResourceManager(system: ActorSystem, jm: ActorRef): ActorGateway = {
+  private def startTestingResourceManager(
+      system: ActorSystem,
+      highAvailabilityServices: HighAvailabilityServices)
+    : ActorRef = {
     val config = new Configuration()
-    val rm: ActorRef = FlinkResourceManager.startResourceManagerActors(
+
+    FlinkResourceManager.startResourceManagerActors(
       config,
       _system,
-      LeaderRetrievalUtils.createLeaderRetrievalService(config, jm),
+      highAvailabilityServices.getJobManagerLeaderRetriever(
+        HighAvailabilityServices.DEFAULT_JOB_ID),
       classOf[TestingResourceManager])
-    new AkkaActorGateway(rm, HighAvailabilityServices.DEFAULT_LEADER_ID)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index bd4a8fc..7980cdf 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
 import org.apache.flink.runtime.instance.{ActorGateway, InstanceManager}
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, SubmittedJobGraphStore}
@@ -48,7 +49,7 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWh
 import org.apache.flink.runtime.testutils.TestingResourceManager
 
 import scala.concurrent.duration.FiniteDuration
-import scala.concurrent.{Await, Future}
+import scala.concurrent.{Await, ExecutionContext, Future}
 
 /**
  * Testing cluster which starts the [[JobManager]] and [[TaskManager]] actors with testing support
@@ -60,16 +61,35 @@ import scala.concurrent.{Await, Future}
  */
 class TestingCluster(
     userConfiguration: Configuration,
+    highAvailabilityServices: HighAvailabilityServices,
     singleActorSystem: Boolean,
     synchronousDispatcher: Boolean)
   extends LocalFlinkMiniCluster(
     userConfiguration,
+    highAvailabilityServices,
     singleActorSystem) {
 
-  def this(userConfiguration: Configuration, singleActorSystem: Boolean) =
-    this(userConfiguration, singleActorSystem, false)
+  def this(
+      userConfiguration: Configuration,
+      singleActorSystem: Boolean,
+      synchronousDispatcher: Boolean) = {
+    this(
+      userConfiguration,
+      HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+        userConfiguration,
+        ExecutionContext.global),
+      singleActorSystem,
+      synchronousDispatcher)
+  }
+
+  def this(userConfiguration: Configuration, singleActorSystem: Boolean) = {
+    this(
+      userConfiguration,
+      singleActorSystem,
+      false)
+  }
 
-  def this(userConfiguration: Configuration) = this(userConfiguration, true, false)
+  def this(userConfiguration: Configuration) = this(userConfiguration, true)
 
   // --------------------------------------------------------------------------
 
@@ -224,10 +244,13 @@ class TestingCluster(
             Seq(newJobManagerActorSystem),
             1))
 
-          val lrs = createLeaderRetrievalService()
+          jobManagerLeaderRetrievalService.foreach(_.stop())
+
+          jobManagerLeaderRetrievalService = Option(
+            highAvailabilityServices.getJobManagerLeaderRetriever(
+              HighAvailabilityServices.DEFAULT_JOB_ID))
 
-          jobManagerLeaderRetrievalService = Some(lrs)
-          lrs.start(this)
+          jobManagerLeaderRetrievalService.foreach(_.start(this))
 
         case _ => throw new Exception("The JobManager of the TestingCluster have not " +
                                         "been started properly.")

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index d139a3f..c8977f0 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -27,22 +27,18 @@ import akka.pattern.{Patterns, ask}
 import com.google.common.util.concurrent.MoreExecutors
 import com.typesafe.config.ConfigFactory
 import grizzled.slf4j.Logger
-import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration.{ConfigConstants, Configuration, HighAvailabilityOptions, TaskManagerOptions}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.concurrent.{ScheduledExecutor, ScheduledExecutorServiceAdapter}
-import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
-import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
 import org.apache.flink.runtime.jobmaster.JobMaster
-import org.apache.flink.runtime.jobmaster.JobMaster.{ARCHIVE_NAME, JOB_MANAGER_NAME}
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService
-import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
+import org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager, RegisteredAtJobManager}
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testutils.TestingResourceManager
 import org.apache.flink.runtime.util.LeaderRetrievalUtils
@@ -222,69 +218,13 @@ object TestingUtils {
     }
   }
 
-  def createTaskManager(
-    actorSystem: ActorSystem,
-    jobManager: ActorRef,
-    configuration: Configuration,
-    useLocalCommunication: Boolean,
-    waitForRegistration: Boolean)
-  : ActorGateway = {
-    val jobManagerURL = AkkaUtils.getAkkaURL(actorSystem, jobManager)
-
-    createTaskManager(
-      actorSystem,
-      jobManagerURL,
-      configuration,
-      useLocalCommunication,
-      waitForRegistration
-    )
-  }
-
-  def createTaskManager(
-    actorSystem: ActorSystem,
-    jobManager: ActorGateway,
-    configuration: Configuration,
-    useLocalCommunication: Boolean,
-    waitForRegistration: Boolean,
-    taskManagerClass: Class[_ <: TaskManager])
-  : ActorGateway = {
-    val jobManagerURL = AkkaUtils.getAkkaURL(actorSystem, jobManager.actor)
-
-    createTaskManager(
-      actorSystem,
-      jobManagerURL,
-      configuration,
-      useLocalCommunication,
-      waitForRegistration,
-      taskManagerClass
-    )
-  }
-
-  def createTaskManager(
-      actorSystem: ActorSystem,
-      jobManager: ActorGateway,
-      configuration: Configuration,
-      useLocalCommunication: Boolean,
-      waitForRegistration: Boolean)
-    : ActorGateway = {
-    val jobManagerURL = AkkaUtils.getAkkaURL(actorSystem, jobManager.actor)
-
-    createTaskManager(
-      actorSystem,
-      jobManagerURL,
-      configuration,
-      useLocalCommunication,
-      waitForRegistration
-    )
-  }
-
   /** Creates a local TaskManager in the given ActorSystem. It is given a
     * [[StandaloneLeaderRetrievalService]] which returns the given jobManagerURL. After creating
     * the TaskManager, waitForRegistration specifies whether one waits until the TaskManager has
     * registered at the JobManager. An ActorGateway to the TaskManager is returned.
     *
     * @param actorSystem ActorSystem in which the TaskManager shall be started
-    * @param jobManagerURL URL of the JobManager to connect to
+    * @param highAvailabilityServices Service factory for high availability
     * @param configuration Configuration
     * @param useLocalCommunication true if the network stack shall use exclusively local
     *                              communication
@@ -293,15 +233,15 @@ object TestingUtils {
     * @return ActorGateway of the created TaskManager
     */
   def createTaskManager(
-    actorSystem: ActorSystem,
-    jobManagerURL: String,
-    configuration: Configuration,
-    useLocalCommunication: Boolean,
-    waitForRegistration: Boolean)
-  : ActorGateway = {
+      actorSystem: ActorSystem,
+      highAvailabilityServices: HighAvailabilityServices,
+      configuration: Configuration,
+      useLocalCommunication: Boolean,
+      waitForRegistration: Boolean)
+    : ActorGateway = {
     createTaskManager(
       actorSystem,
-      jobManagerURL,
+      highAvailabilityServices,
       configuration,
       useLocalCommunication,
       waitForRegistration,
@@ -311,7 +251,7 @@ object TestingUtils {
 
   def createTaskManager(
       actorSystem: ActorSystem,
-      jobManagerURL: String,
+      highAvailabilityServices: HighAvailabilityServices,
       configuration: Configuration,
       useLocalCommunication: Boolean,
       waitForRegistration: Boolean,
@@ -324,26 +264,27 @@ object TestingUtils {
 
     resultingConfiguration.addAll(configuration)
 
-    val leaderRetrievalService = Option(new StandaloneLeaderRetrievalService(jobManagerURL))
-
     val taskManager = TaskManager.startTaskManagerComponentsAndActor(
       resultingConfiguration,
       ResourceID.generate(),
       actorSystem,
+      highAvailabilityServices,
       "localhost",
       None,
-      leaderRetrievalService,
       useLocalCommunication,
       taskManagerClass
     )
 
-    if (waitForRegistration) {
+    val leaderId = if (waitForRegistration) {
       val notificationResult = (taskManager ? NotifyWhenRegisteredAtJobManager)(TESTING_DURATION)
+        .mapTo[RegisteredAtJobManager]
 
-      Await.ready(notificationResult, TESTING_DURATION)
+      Await.result(notificationResult, TESTING_DURATION).leaderId
+    } else {
+      HighAvailabilityServices.DEFAULT_LEADER_ID
     }
 
-    new AkkaActorGateway(taskManager, HighAvailabilityServices.DEFAULT_LEADER_ID)
+    new AkkaActorGateway(taskManager, leaderId)
   }
 
   /** Stops the given actor by sending it a Kill message
@@ -428,13 +369,15 @@ object TestingUtils {
       actorSystem: ActorSystem,
       futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
-      configuration: Configuration)
+      configuration: Configuration,
+      highAvailabilityServices: HighAvailabilityServices)
     : ActorGateway = {
     createJobManager(
       actorSystem,
       futureExecutor,
       ioExecutor,
       configuration,
+      highAvailabilityServices,
       classOf[TestingJobManager],
       ""
     )
@@ -455,6 +398,7 @@ object TestingUtils {
       futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       configuration: Configuration,
+      highAvailabilityServices: HighAvailabilityServices,
       prefix: String)
     : ActorGateway = {
     createJobManager(
@@ -462,6 +406,7 @@ object TestingUtils {
       futureExecutor,
       ioExecutor,
       configuration,
+      highAvailabilityServices,
       classOf[TestingJobManager],
       prefix
     )
@@ -474,6 +419,7 @@ object TestingUtils {
     * @param futureExecutor to run the JobManager's futures
     * @param ioExecutor to run blocking io operations
     * @param configuration Configuration to use
+    * @param highAvailabilityServices Service factory for high availability
     * @param jobManagerClass JobManager class to instantiate
     * @return
     */
@@ -482,10 +428,18 @@ object TestingUtils {
       futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       configuration: Configuration,
+      highAvailabilityServices: HighAvailabilityServices,
       jobManagerClass: Class[_ <: JobManager])
     : ActorGateway = {
 
-    createJobManager(actorSystem, futureExecutor, ioExecutor, configuration, jobManagerClass, "")
+    createJobManager(
+      actorSystem,
+      futureExecutor,
+      ioExecutor,
+      configuration,
+      highAvailabilityServices,
+      jobManagerClass,
+      "")
   }
 
   /**
@@ -496,6 +450,7 @@ object TestingUtils {
     * @param futureExecutor to run the JobManager's futures
     * @param ioExecutor to run blocking io operations
     * @param configuration Configuration to use
+    * @param highAvailabilityServices Service factory for high availability
     * @param jobManagerClass JobManager class to instantiate
     * @param prefix The prefix to use for the Actor names
    * @return
@@ -505,6 +460,7 @@ object TestingUtils {
       futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       configuration: Configuration,
+      highAvailabilityServices: HighAvailabilityServices,
       jobManagerClass: Class[_ <: JobManager],
       prefix: String)
     : ActorGateway = {
@@ -518,24 +474,33 @@ object TestingUtils {
         actorSystem,
         futureExecutor,
         ioExecutor,
+        highAvailabilityServices,
         Some(prefix + JobMaster.JOB_MANAGER_NAME),
         Some(prefix + JobMaster.ARCHIVE_NAME),
         jobManagerClass,
         classOf[MemoryArchivist])
 
-    new AkkaActorGateway(actor, HighAvailabilityServices.DEFAULT_LEADER_ID)
+
+    val leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
+        highAvailabilityServices.getJobManagerLeaderRetriever(
+          HighAvailabilityServices.DEFAULT_JOB_ID),
+        TestingUtils.TESTING_TIMEOUT)
+
+    new AkkaActorGateway(actor, leaderId)
   }
 
   /** Creates a forwarding JobManager which sends all received message to the forwarding target.
     *
     * @param actorSystem The actor system to start the actor in.
     * @param forwardingTarget Target to forward to.
+    * @param leaderId leader id for the actor gateway
     * @param actorName Name for forwarding Actor
     * @return
     */
   def createForwardingActor(
       actorSystem: ActorSystem,
       forwardingTarget: ActorRef,
+      leaderId: UUID,
       actorName: Option[String] = None)
     : ActorGateway = {
 
@@ -545,7 +510,7 @@ object TestingUtils {
           Props(
             classOf[ForwardingActor],
             forwardingTarget,
-            Option(HighAvailabilityServices.DEFAULT_LEADER_ID)),
+            Option(leaderId)),
           name
         )
       case None =>
@@ -553,58 +518,39 @@ object TestingUtils {
           Props(
             classOf[ForwardingActor],
             forwardingTarget,
-            Option(HighAvailabilityServices.DEFAULT_LEADER_ID))
+            Option(leaderId))
         )
     }
 
-    new AkkaActorGateway(actor, HighAvailabilityServices.DEFAULT_LEADER_ID)
-  }
-
-  def submitJobAndWait(
-      actorSystem: ActorSystem,
-      jobManager: ActorGateway,
-      jobGraph: JobGraph,
-      config: Configuration)
-    : JobExecutionResult = {
-
-    val jobManagerURL = AkkaUtils.getAkkaURL(actorSystem, jobManager.actor)
-    val leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManagerURL)
-
-    JobClient.submitJobAndWait(
-      actorSystem,
-      config,
-      leaderRetrievalService,
-      jobGraph,
-      TESTING_DURATION,
-      false,
-      Thread.currentThread().getContextClassLoader
-    )
+    new AkkaActorGateway(actor, leaderId)
   }
 
-  /** Creates a testing JobManager using the default recovery mode (standalone)
+  /** Creates a testing JobManager using the given configuration and high availability services.
     *
     * @param actorSystem The actor system to start the actor
-    * @param jobManager The jobManager for the standalone leader service.
     * @param configuration The configuration
+    * @param highAvailabilityServices The high availability services to use
     * @return
     */
   def createResourceManager(
       actorSystem: ActorSystem,
-      jobManager: ActorRef,
-      configuration: Configuration)
+      configuration: Configuration,
+      highAvailabilityServices: HighAvailabilityServices)
   : ActorGateway = {
 
-    configuration.setString(
-      HighAvailabilityOptions.HA_MODE,
-      ConfigConstants.DEFAULT_HA_MODE)
-
-    val actor = FlinkResourceManager.startResourceManagerActors(
+    val resourceManager = FlinkResourceManager.startResourceManagerActors(
       configuration,
       actorSystem,
-      LeaderRetrievalUtils.createLeaderRetrievalService(configuration, jobManager),
+      highAvailabilityServices.getJobManagerLeaderRetriever(
+        HighAvailabilityServices.DEFAULT_JOB_ID),
       classOf[TestingResourceManager])
 
-    new AkkaActorGateway(actor, HighAvailabilityServices.DEFAULT_LEADER_ID)
+    val leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
+      highAvailabilityServices.getJobManagerLeaderRetriever(
+        HighAvailabilityServices.DEFAULT_JOB_ID),
+      TestingUtils.TESTING_TIMEOUT)
+
+    new AkkaActorGateway(resourceManager, leaderId)
   }
 
   class ForwardingActor(val target: ActorRef, val leaderSessionID: Option[UUID])

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index 4d3adb2..bb26454 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -25,6 +25,7 @@ import org.apache.commons.cli.CommandLine
 import org.apache.flink.client.cli.CliFrontendParser
 import org.apache.flink.client.program.ClusterClient
 import org.apache.flink.client.CliFrontend
+import org.apache.flink.runtime.minicluster.StandaloneMiniCluster
 import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration}
 import org.apache.flink.runtime.minicluster.{FlinkMiniCluster, LocalFlinkMiniCluster}
 
@@ -140,18 +141,17 @@ object FlinkShell {
 
   def fetchConnectionInfo(
     config: Config
-  ): (String, Int, Option[Either[FlinkMiniCluster, ClusterClient]]) = {
+  ): (String, Int, Option[Either[StandaloneMiniCluster, ClusterClient]]) = {
     config.executionMode match {
       case ExecutionMode.LOCAL => // Local mode
         val config = GlobalConfiguration.loadConfiguration()
         config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
 
-        val miniCluster = new LocalFlinkMiniCluster(config, false)
-        miniCluster.start()
+        val miniCluster = new StandaloneMiniCluster(config)
 
         println("\nStarting local Flink cluster (host: localhost, " +
-          s"port: ${miniCluster.getLeaderRPCPort}).\n")
-        ("localhost", miniCluster.getLeaderRPCPort, Some(Left(miniCluster)))
+          s"port: ${miniCluster.getPort}).\n")
+        ("localhost", miniCluster.getPort, Some(Left(miniCluster)))
 
       case ExecutionMode.REMOTE => // Remote mode
         if (config.host.isEmpty || config.port.isEmpty) {
@@ -188,7 +188,7 @@ object FlinkShell {
     val (repl, cluster) = try {
       val (host, port, cluster) = fetchConnectionInfo(config)
       val conf = cluster match {
-        case Some(Left(miniCluster)) => miniCluster.userConfiguration
+        case Some(Left(miniCluster)) => miniCluster.getConfiguration
         case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
         case None => GlobalConfiguration.loadConfiguration()
       }
@@ -218,7 +218,7 @@ object FlinkShell {
     } finally {
       repl.closeInterpreter()
       cluster match {
-        case Some(Left(miniCluster)) => miniCluster.stop()
+        case Some(Left(miniCluster)) => miniCluster.close()
         case Some(Right(yarnCluster)) => yarnCluster.shutdown()
         case _ =>
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 00410cc..0e89da3 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -19,14 +19,16 @@
 package org.apache.flink.api.scala
 
 import java.io._
-import java.util.concurrent.TimeUnit
 
-import org.apache.flink.configuration.GlobalConfiguration
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+import akka.actor.ActorRef
+import akka.pattern.Patterns
+import org.apache.flink.runtime.minicluster.StandaloneMiniCluster
+import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.util.TestLogger
 import org.junit.{AfterClass, Assert, BeforeClass, Test}
 
+import scala.concurrent.{Await, Future}
 import scala.concurrent.duration.FiniteDuration
 import scala.tools.nsc.Settings
 
@@ -299,18 +301,19 @@ class ScalaShellITCase extends TestLogger {
       .getFile
     val confDir = new File(confFile).getAbsoluteFile.getParent
 
-    val (c, args) = cluster match{
+    val args = cluster match {
       case Some(cl) =>
-        val arg = Array("remote",
-          cl.hostname,
-          Integer.toString(cl.getLeaderRPCPort),
+        Array(
+          "remote",
+          cl.getHostname,
+          Integer.toString(cl.getPort),
           "--configDir",
           confDir)
-        (cl, arg)
-      case None =>
-        throw new AssertionError("Cluster creation failed.")
+      case None => throw new IllegalStateException("Cluster has not been started.")
     }
 
+
+
     //start scala shell with initialized
     // buffered reader for testing
     FlinkShell.bufferedReader = Some(in)
@@ -335,26 +338,24 @@ class ScalaShellITCase extends TestLogger {
 }
 
 object ScalaShellITCase {
-  var cluster: Option[LocalFlinkMiniCluster] = None
+  var cluster: Option[StandaloneMiniCluster] = None
+
   val parallelism = 4
+  val configuration = new Configuration()
 
   @BeforeClass
   def beforeAll(): Unit = {
-    val cl = TestBaseUtils.startCluster(
-      1,
-      parallelism,
-      false,
-      false,
-      false)
-
-    cluster = Some(cl)
+    configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism)
+
+    cluster = Option(new StandaloneMiniCluster(configuration))
   }
 
   @AfterClass
   def afterAll(): Unit = {
     // The Scala interpreter somehow changes the class loader. Therfore, we have to reset it
     Thread.currentThread().setContextClassLoader(classOf[ScalaShellITCase].getClassLoader)
-    cluster.foreach(c => TestBaseUtils.stopCluster(c, new FiniteDuration(1000, TimeUnit.SECONDS)))
+
+    cluster.foreach(_.close)
   }
 
   /**
@@ -371,47 +372,46 @@ object ScalaShellITCase {
     val oldOut = System.out
     System.setOut(new PrintStream(baos))
 
-    // new local cluster
-    val host = "localhost"
-    val port = cluster match {
-      case Some(c) => c.getLeaderRPCPort
-      case _ => throw new RuntimeException("Test cluster not initialized.")
-    }
-
-    val repl = externalJars match {
-      case Some(ej) => new FlinkILoop(
-        host, port,
-        GlobalConfiguration.loadConfiguration(),
-        Option(Array(ej)),
-        in, new PrintWriter(out))
-
-      case None => new FlinkILoop(
-        host, port,
-        GlobalConfiguration.loadConfiguration(),
-        in, new PrintWriter(out))
-    }
+    cluster match {
+      case Some(cl) =>
+        val repl = externalJars match {
+          case Some(ej) => new FlinkILoop(
+            cl.getHostname,
+            cl.getPort,
+            GlobalConfiguration.loadConfiguration(),
+            Option(Array(ej)),
+            in, new PrintWriter(out))
+
+          case None => new FlinkILoop(
+            cl.getHostname,
+            cl.getPort,
+            GlobalConfiguration.loadConfiguration(),
+            in, new PrintWriter(out))
+        }
 
-    repl.settings = new Settings()
+        repl.settings = new Settings()
 
-    // enable this line to use scala in intellij
-    repl.settings.usejavacp.value = true
+        // enable this line to use scala in intellij
+        repl.settings.usejavacp.value = true
 
-    externalJars match {
-      case Some(ej) => repl.settings.classpath.value = ej
-      case None =>
-    }
+        externalJars match {
+          case Some(ej) => repl.settings.classpath.value = ej
+          case None =>
+        }
 
-    repl.process(repl.settings)
+        repl.process(repl.settings)
 
-    repl.closeInterpreter()
+        repl.closeInterpreter()
 
-    System.setOut(oldOut)
+        System.setOut(oldOut)
 
-    baos.flush()
+        baos.flush()
 
-    val stdout = baos.toString
+        val stdout = baos.toString
 
-    out.toString + stdout
+        out.toString + stdout
+      case _ => throw new IllegalStateException("The cluster has not been started.")
+    }
   }
 
   def findLibraryFolder(paths: String*): File = {

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index ff5af87..2ff45ba 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -219,7 +219,11 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 			throw new ProgramInvocationException("The program execution failed" + term, e);
 		}
 		finally {
-			client.shutdown();
+			try {
+				client.shutdown();
+			} catch (Exception e) {
+				LOG.warn("Could not properly shut down the cluster client.", e);
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 64c68dc..5719456 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -26,43 +27,84 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFact
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.util.Preconditions;
 
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
 /**
- * A StreamExecutionEnvironment that executes its jobs on a test cluster.
+ * A {@link StreamExecutionEnvironment} that executes its jobs on {@link LocalFlinkMiniCluster}.
  */
 public class TestStreamEnvironment extends StreamExecutionEnvironment {
 	
 	/** The mini cluster in which this environment executes its jobs */
-	private LocalFlinkMiniCluster executor;
-	
+	private final LocalFlinkMiniCluster miniCluster;
+
+	private final Collection<Path> jarFiles;
+
+	private final Collection<URL> classPaths;
+
+	public TestStreamEnvironment(
+			LocalFlinkMiniCluster miniCluster,
+			int parallelism,
+			Collection<Path> jarFiles,
+			Collection<URL> classPaths) {
+
+		this.miniCluster = Preconditions.checkNotNull(miniCluster);
+		this.jarFiles = Preconditions.checkNotNull(jarFiles);
+		this.classPaths = Preconditions.checkNotNull(classPaths);
 
-	public TestStreamEnvironment(LocalFlinkMiniCluster executor, int parallelism) {
-		this.executor = Preconditions.checkNotNull(executor);
 		setParallelism(parallelism);
 	}
+
+	public TestStreamEnvironment(
+			LocalFlinkMiniCluster miniCluster,
+			int parallelism) {
+		this(miniCluster, parallelism, Collections.<Path>emptyList(), Collections.<URL>emptyList());
+	}
+
 	
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
 		final StreamGraph streamGraph = getStreamGraph();
 		streamGraph.setJobName(jobName);
 		final JobGraph jobGraph = streamGraph.getJobGraph();
-		return executor.submitJobAndWait(jobGraph, false);
+
+		for (Path jarFile: jarFiles) {
+			jobGraph.addJar(jarFile);
+		}
+
+		jobGraph.setClasspaths(new ArrayList<>(classPaths));
+
+		return miniCluster.submitJobAndWait(jobGraph, false);
 	}
 
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Sets the streaming context environment to a TestStreamEnvironment that runs its programs on
-	 * the given cluster with the given default parallelism.
-	 * 
+	 * the given cluster with the given default parallelism and the specified jar files and class
+	 * paths.
+	 *
 	 * @param cluster The test cluster to run the test program on.
 	 * @param parallelism The default parallelism for the test programs.
+	 * @param jarFiles Additional jar files to execute the job with
+	 * @param classpaths Additional class paths to execute the job with
 	 */
-	public static void setAsContext(final LocalFlinkMiniCluster cluster, final int parallelism) {
-		
+	public static void setAsContext(
+			final LocalFlinkMiniCluster cluster,
+			final int parallelism,
+			final Collection<Path> jarFiles,
+			final Collection<URL> classpaths) {
+
 		StreamExecutionEnvironmentFactory factory = new StreamExecutionEnvironmentFactory() {
 			@Override
 			public StreamExecutionEnvironment createExecutionEnvironment() {
-				return new TestStreamEnvironment(cluster, parallelism);
+				return new TestStreamEnvironment(
+					cluster,
+					parallelism,
+					jarFiles,
+					classpaths);
 			}
 		};
 
@@ -70,6 +112,21 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 	}
 
 	/**
+	 * Sets the streaming context environment to a TestStreamEnvironment that runs its programs on
+	 * the given cluster with the given default parallelism.
+	 * 
+	 * @param cluster The test cluster to run the test program on.
+	 * @param parallelism The default parallelism for the test programs.
+	 */
+	public static void setAsContext(final LocalFlinkMiniCluster cluster, final int parallelism) {
+		setAsContext(
+			cluster,
+			parallelism,
+			Collections.<Path>emptyList(),
+			Collections.<URL>emptyList());
+	}
+
+	/**
 	 * Resets the streaming context environment to null.
 	 */
 	public static void unsetAsContext() {

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index 2bcf66d..f06cdb7 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -111,7 +111,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 			}
 			
 			// prepare the test environment
-			TestEnvironment env = new TestEnvironment(this.executor, this.parallelism);
+			TestEnvironment env = new TestEnvironment(this.executor, this.parallelism, false);
 			env.getConfig().enableObjectReuse();
 			env.setAsContext();
 
@@ -163,7 +163,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 			}
 
 			// prepare the test environment
-			TestEnvironment env = new TestEnvironment(this.executor, this.parallelism);
+			TestEnvironment env = new TestEnvironment(this.executor, this.parallelism, false);
 			env.getConfig().disableObjectReuse();
 			env.setAsContext();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index 2043cd0..ed141d1 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -86,7 +86,7 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
 		
 		switch(mode){
 			case CLUSTER:
-				new TestEnvironment(cluster, 4).setAsContext();
+				new TestEnvironment(cluster, 4, false).setAsContext();
 				break;
 			case CLUSTER_OBJECT_REUSE:
 				new TestEnvironment(cluster, 4, true).setAsContext();

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index aea8152..a30db40 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.CodeAnalysisMode;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -30,39 +31,71 @@ import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.util.Preconditions;
 
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link ExecutionEnvironment} implementation which executes its jobs on a
+ * {@link LocalFlinkMiniCluster}.
+ */
 public class TestEnvironment extends ExecutionEnvironment {
 
-	private final LocalFlinkMiniCluster executor;
+	private final LocalFlinkMiniCluster miniCluster;
 
-	private TestEnvironment lastEnv = null;
+	private final Collection<Path> jarFiles;
 
-	@Override
-	public JobExecutionResult getLastJobExecutionResult() {
-		if (lastEnv == null) {
-			return this.lastJobExecutionResult;
-		}
-		else {
-			return lastEnv.getLastJobExecutionResult();
-		}
-	}
+	private final Collection<URL> classPaths;
+
+	private TestEnvironment lastEnv;
+
+	public TestEnvironment(
+			LocalFlinkMiniCluster miniCluster,
+			int parallelism,
+			boolean isObjectReuseEnabled,
+			Collection<Path> jarFiles,
+			Collection<URL> classPaths) {
+		this.miniCluster = Preconditions.checkNotNull(miniCluster);
+		this.jarFiles = Preconditions.checkNotNull(jarFiles);
+		this.classPaths = Preconditions.checkNotNull(classPaths);
 
-	public TestEnvironment(LocalFlinkMiniCluster executor, int parallelism) {
-		this.executor = executor;
 		setParallelism(parallelism);
 
 		// disabled to improve build time
 		getConfig().setCodeAnalysisMode(CodeAnalysisMode.DISABLE);
-	}
-
-	public TestEnvironment(LocalFlinkMiniCluster executor, int parallelism, boolean isObjectReuseEnabled) {
-		this(executor, parallelism);
 
 		if (isObjectReuseEnabled) {
 			getConfig().enableObjectReuse();
 		} else {
 			getConfig().disableObjectReuse();
 		}
+
+		lastEnv = null;
+	}
+
+	public TestEnvironment(
+			LocalFlinkMiniCluster executor,
+			int parallelism,
+			boolean isObjectReuseEnabled) {
+		this(
+			executor,
+			parallelism,
+			isObjectReuseEnabled,
+			Collections.<Path>emptyList(),
+			Collections.<URL>emptyList());
+	}
+
+	@Override
+	public JobExecutionResult getLastJobExecutionResult() {
+		if (lastEnv == null) {
+			return lastJobExecutionResult;
+		}
+		else {
+			return lastEnv.getLastJobExecutionResult();
+		}
 	}
 
 	@Override
@@ -76,7 +109,13 @@ public class TestEnvironment extends ExecutionEnvironment {
 		JobGraphGenerator jgg = new JobGraphGenerator();
 		JobGraph jobGraph = jgg.compileJobGraph(op);
 
-		this.lastJobExecutionResult = executor.submitJobAndWait(jobGraph, false);
+		for (Path jarFile: jarFiles) {
+			jobGraph.addJar(jarFile);
+		}
+
+		jobGraph.setClasspaths(new ArrayList<>(classPaths));
+
+		this.lastJobExecutionResult = miniCluster.submitJobAndWait(jobGraph, false);
 		return this.lastJobExecutionResult;
 	}
 
@@ -93,7 +132,7 @@ public class TestEnvironment extends ExecutionEnvironment {
 	private OptimizedPlan compileProgram(String jobName) {
 		Plan p = createProgramPlan(jobName);
 
-		Optimizer pc = new Optimizer(new DataStatistics(), this.executor.configuration());
+		Optimizer pc = new Optimizer(new DataStatistics(), this.miniCluster.configuration());
 		return pc.compile(p);
 	}
 
@@ -101,11 +140,65 @@ public class TestEnvironment extends ExecutionEnvironment {
 		ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
 			@Override
 			public ExecutionEnvironment createExecutionEnvironment() {
-				lastEnv = new TestEnvironment(executor, getParallelism(), getConfig().isObjectReuseEnabled());
+				lastEnv = new TestEnvironment(miniCluster, getParallelism(), getConfig().isObjectReuseEnabled());
 				return lastEnv;
 			}
 		};
 
 		initializeContextEnvironment(factory);
 	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	/**
+	 * Sets the current {@link ExecutionEnvironment} to be a {@link TestEnvironment}. The test
+	 * environment executes the given jobs on a Flink mini cluster with the given default
+	 * parallelism and the additional jar files and class paths.
+	 *
+	 * @param miniCluster The mini cluster on which to execute the jobs
+	 * @param parallelism The default parallelism
+	 * @param jarFiles Additional jar files to execute the job with
+	 * @param classPaths Additional class paths to execute the job with
+	 */
+	public static void setAsContext(
+		final LocalFlinkMiniCluster miniCluster,
+		final int parallelism,
+		final Collection<Path> jarFiles,
+		final Collection<URL> classPaths) {
+
+		ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
+			@Override
+			public ExecutionEnvironment createExecutionEnvironment() {
+				return new TestEnvironment(
+					miniCluster,
+					parallelism,
+					false, jarFiles,
+					classPaths
+				);
+			}
+		};
+
+		initializeContextEnvironment(factory);
+	}
+
+
+	/**
+	 * Sets the current {@link ExecutionEnvironment} to be a {@link TestEnvironment}. The test
+	 * environment executes the given jobs on a Flink mini cluster with the given default
+	 * parallelism and the additional jar files and class paths.
+	 *
+	 * @param miniCluster The mini cluster on which to execute the jobs
+	 * @param parallelism The default parallelism
+	 */
+	public static void setAsContext(final LocalFlinkMiniCluster miniCluster, final int parallelism) {
+		setAsContext(
+			miniCluster,
+			parallelism,
+			Collections.<Path>emptyList(),
+			Collections.<URL>emptyList());
+	}
+
+	public static void unsetAsContext() {
+		resetContextEnvironment();
+	}
 }