You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:11 UTC

[06/16] flink git commit: [FLINK-6078] Remove CuratorFramework#close calls from ZooKeeper based HA services

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
index 56d1525..25d9228 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -48,39 +49,30 @@ public class AccumulatorErrorITCase extends TestLogger {
 
 	private static LocalFlinkMiniCluster cluster;
 
+	private static ExecutionEnvironment env;
+
 	@BeforeClass
 	public static void startCluster() {
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
-			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
-			cluster = new LocalFlinkMiniCluster(config, false);
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
+		cluster = new LocalFlinkMiniCluster(config, false);
 
-			cluster.start();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to start test cluster: " + e.getMessage());
-		}
+		cluster.start();
+
+		env = new TestEnvironment(cluster, 6, false);
 	}
 
 	@AfterClass
 	public static void shutdownCluster() {
-		try {
-			cluster.shutdown();
-			cluster = null;
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to stop test cluster: " + e.getMessage());
-		}
+		cluster.shutdown();
+		cluster = null;
 	}
 
 	@Test
 	public void testFaultyAccumulator() throws Exception {
 
-		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
 		env.getConfig().disableSysoutLogging();
 
 		// Test Exception forwarding with faulty Accumulator implementation
@@ -93,11 +85,9 @@ public class AccumulatorErrorITCase extends TestLogger {
 		try {
 			env.execute();
 			fail("Should have failed.");
-		} catch (ProgramInvocationException e) {
-			Assert.assertTrue("Exception should be passed:",
-					e.getCause() instanceof JobExecutionException);
+		} catch (JobExecutionException e) {
 			Assert.assertTrue("Root cause should be:",
-					e.getCause().getCause() instanceof CustomException);
+					e.getCause() instanceof CustomException);
 		}
 	}
 
@@ -105,7 +95,6 @@ public class AccumulatorErrorITCase extends TestLogger {
 	@Test
 	public void testInvalidTypeAccumulator() throws Exception {
 
-		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
 		env.getConfig().disableSysoutLogging();
 
 		// Test Exception forwarding with faulty Accumulator implementation
@@ -119,13 +108,11 @@ public class AccumulatorErrorITCase extends TestLogger {
 		try {
 			env.execute();
 			fail("Should have failed.");
-		} catch (ProgramInvocationException e) {
-			Assert.assertTrue("Exception should be passed:",
-					e.getCause() instanceof JobExecutionException);
+		} catch (JobExecutionException e) {
 			Assert.assertTrue("Root cause should be:",
-					e.getCause().getCause() instanceof Exception);
+					e.getCause() instanceof Exception);
 			Assert.assertTrue("Root cause should be:",
-					e.getCause().getCause().getCause() instanceof UnsupportedOperationException);
+					e.getCause().getCause() instanceof UnsupportedOperationException);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index e8ceeba..d91c57f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -36,13 +36,13 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -81,6 +81,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
 	private static LocalFlinkMiniCluster cluster;
 
+	private static TestStreamEnvironment env;
+
 	@Rule
 	public TemporaryFolder tempFolder = new TemporaryFolder();
 
@@ -104,6 +106,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
 		cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();
+
+		env = new TestStreamEnvironment(cluster, PARALLELISM);
 	}
 
 	@AfterClass
@@ -153,9 +157,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 		FailingSource.reset();
 		
 		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getLeaderRPCPort());
-			
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 			env.enableCheckpointing(100);
@@ -226,9 +227,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 		FailingSource.reset();
 
 		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getLeaderRPCPort());
-
 			env.setParallelism(PARALLELISM);
 			env.setMaxParallelism(maxParallelism);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -296,9 +294,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 		FailingSource.reset();
 
 		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getLeaderRPCPort());
-
 			env.setMaxParallelism(2 * PARALLELISM);
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -361,9 +356,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 		FailingSource.reset();
 
 		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getLeaderRPCPort());
-
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 			env.enableCheckpointing(100);
@@ -434,9 +426,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 		FailingSource.reset();
 
 		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getLeaderRPCPort());
-
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 			env.enableCheckpointing(100);

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 3345b9c..a573be6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -30,13 +30,13 @@ import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -65,6 +65,8 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 
 	private static LocalFlinkMiniCluster cluster;
 
+	private static TestStreamEnvironment env;
+
 
 	@BeforeClass
 	public static void startTestCluster() {
@@ -76,6 +78,8 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 		config.setString(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, "60 s");
 		cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();
+
+		env = new TestStreamEnvironment(cluster, PARALLELISM);
 	}
 
 	@AfterClass
@@ -95,9 +99,6 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 		FailingSource.reset();
 		
 		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getLeaderRPCPort());
-			
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 			env.enableCheckpointing(100);
@@ -159,9 +160,6 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 		FailingSource.reset();
 
 		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getLeaderRPCPort());
-
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 			env.enableCheckpointing(100);
@@ -220,9 +218,6 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 		FailingSource.reset();
 
 		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getLeaderRPCPort());
-
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 			env.enableCheckpointing(100);
@@ -292,9 +287,6 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 		FailingSource.reset();
 
 		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getLeaderRPCPort());
-
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 			env.enableCheckpointing(100);
@@ -364,9 +356,6 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 		FailingSource.reset();
 
 		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getLeaderRPCPort());
-
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 			env.enableCheckpointing(100);

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 2839bc1..5f56def 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -95,8 +96,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 	@Test
 	public void runCheckpointedProgram() throws Exception {
 		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getLeaderRPCPort());
+			TestStreamEnvironment env = new TestStreamEnvironment(cluster, PARALLELISM);
 			env.setParallelism(PARALLELISM);
 			env.enableCheckpointing(500);
 			env.getConfig().disableSysoutLogging();

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index 56d8c66..7004f75 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -30,12 +30,12 @@ import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -76,6 +76,8 @@ public class WindowCheckpointingITCase extends TestLogger {
 
 	private static LocalFlinkMiniCluster cluster;
 
+	private static TestStreamEnvironment env;
+
 
 	@BeforeClass
 	public static void startTestCluster() {
@@ -86,6 +88,8 @@ public class WindowCheckpointingITCase extends TestLogger {
 
 		cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();
+
+		env = new TestStreamEnvironment(cluster, PARALLELISM);
 	}
 
 	@AfterClass
@@ -103,9 +107,6 @@ public class WindowCheckpointingITCase extends TestLogger {
 		FailingSource.reset();
 		
 		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getLeaderRPCPort());
-			
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(timeCharacteristic);
 			env.getConfig().setAutoWatermarkInterval(10);
@@ -161,9 +162,6 @@ public class WindowCheckpointingITCase extends TestLogger {
 		FailingSource.reset();
 
 		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getLeaderRPCPort());
-			
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(timeCharacteristic);
 			env.getConfig().setAutoWatermarkInterval(10);
@@ -219,9 +217,6 @@ public class WindowCheckpointingITCase extends TestLogger {
 		FailingSource.reset();
 
 		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getLeaderRPCPort());
-
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(timeCharacteristic);
 			env.getConfig().setAutoWatermarkInterval(10);
@@ -267,9 +262,6 @@ public class WindowCheckpointingITCase extends TestLogger {
 		FailingSource.reset();
 
 		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getLeaderRPCPort());
-
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(timeCharacteristic);
 			env.getConfig().setAutoWatermarkInterval(10);

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index f25a302..75eb112 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -38,8 +38,10 @@ import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSucc
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.testdata.KMeansData;
 import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -54,13 +56,14 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
+import java.io.IOException;
+import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
 
 public class ClassLoaderITCase extends TestLogger {
 
@@ -117,61 +120,69 @@ public class ClassLoaderITCase extends TestLogger {
 		}
 
 		FOLDER.delete();
+
+		TestStreamEnvironment.unsetAsContext();
+		TestEnvironment.unsetAsContext();
 	}
 
 	@Test
-	public void testJobsWithCustomClassLoader() {
+	public void testJobsWithCustomClassLoader() throws IOException, ProgramInvocationException {
 		try {
 			int port = testCluster.getLeaderRPCPort();
 
-			PackagedProgram inputSplitTestProg = new PackagedProgram(
-					new File(INPUT_SPLITS_PROG_JAR_FILE),
-					new String[] { INPUT_SPLITS_PROG_JAR_FILE,
-							"", // classpath
-							"localhost",
-							String.valueOf(port),
-							"4" // parallelism
-					});
+			PackagedProgram inputSplitTestProg = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
+
+			TestEnvironment.setAsContext(
+				testCluster,
+				parallelism,
+				Collections.singleton(new Path(INPUT_SPLITS_PROG_JAR_FILE)),
+				Collections.<URL>emptyList());
+
 			inputSplitTestProg.invokeInteractiveModeForExecution();
 
-			PackagedProgram streamingInputSplitTestProg = new PackagedProgram(
-					new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE),
-					new String[] { STREAMING_INPUT_SPLITS_PROG_JAR_FILE,
-							"localhost",
-							String.valueOf(port),
-							"4" // parallelism
-					});
+			PackagedProgram streamingInputSplitTestProg = new PackagedProgram(new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE));
+
+			TestStreamEnvironment.setAsContext(
+				testCluster,
+				parallelism,
+				Collections.singleton(new Path(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)),
+				Collections.<URL>emptyList());
+
 			streamingInputSplitTestProg.invokeInteractiveModeForExecution();
 
-			String classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL().toString();
-			PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE),
-					new String[] { "",
-							classpath, // classpath
-							"localhost",
-							String.valueOf(port),
-							"4" // parallelism
-					});
+			URL classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL();
+			PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
+
+			TestEnvironment.setAsContext(
+				testCluster,
+				parallelism,
+				Collections.<Path>emptyList(),
+				Collections.singleton(classpath));
+
 			inputSplitTestProg2.invokeInteractiveModeForExecution();
 
 			// regular streaming job
-			PackagedProgram streamingProg = new PackagedProgram(
-					new File(STREAMING_PROG_JAR_FILE),
-					new String[] {
-							STREAMING_PROG_JAR_FILE,
-							"localhost",
-							String.valueOf(port)
-					});
+			PackagedProgram streamingProg = new PackagedProgram(new File(STREAMING_PROG_JAR_FILE));
+
+			TestStreamEnvironment.setAsContext(
+				testCluster,
+				parallelism,
+				Collections.singleton(new Path(STREAMING_PROG_JAR_FILE)),
+				Collections.<URL>emptyList());
+
 			streamingProg.invokeInteractiveModeForExecution();
 
 			// checkpointed streaming job with custom classes for the checkpoint (FLINK-2543)
 			// the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient.
 			try {
-				PackagedProgram streamingCheckpointedProg = new PackagedProgram(
-						new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE),
-						new String[] {
-								STREAMING_CHECKPOINTED_PROG_JAR_FILE,
-								"localhost",
-								String.valueOf(port) });
+				PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE));
+
+				TestStreamEnvironment.setAsContext(
+					testCluster,
+					parallelism,
+					Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
+					Collections.<URL>emptyList());
+
 				streamingCheckpointedProg.invokeInteractiveModeForExecution();
 			} catch (Exception e) {
 				// we can not access the SuccessException here when executing the tests with maven, because its not available in the jar.
@@ -182,14 +193,18 @@ public class ClassLoaderITCase extends TestLogger {
 
 			PackagedProgram kMeansProg = new PackagedProgram(
 					new File(KMEANS_JAR_PATH),
-					new String[] { KMEANS_JAR_PATH,
-							"localhost",
-							String.valueOf(port),
-							"4", // parallelism
-							KMeansData.DATAPOINTS,
-							KMeansData.INITIAL_CENTERS,
-							"25"
+					new String[] {
+						KMeansData.DATAPOINTS,
+						KMeansData.INITIAL_CENTERS,
+						"25"
 					});
+
+			TestEnvironment.setAsContext(
+				testCluster,
+				parallelism,
+				Collections.singleton(new Path(KMEANS_JAR_PATH)),
+				Collections.<URL>emptyList());
+
 			kMeansProg.invokeInteractiveModeForExecution();
 
 			// test FLINK-3633
@@ -200,6 +215,12 @@ public class ClassLoaderITCase extends TestLogger {
 							String.valueOf(port),
 					});
 
+			TestEnvironment.setAsContext(
+				testCluster,
+				parallelism,
+				Collections.singleton(new Path(USERCODETYPE_JAR_PATH)),
+				Collections.<URL>emptyList());
+
 			userCodeTypeProg.invokeInteractiveModeForExecution();
 
 			File checkpointDir = FOLDER.newFolder();
@@ -208,18 +229,21 @@ public class ClassLoaderITCase extends TestLogger {
 			final PackagedProgram program = new PackagedProgram(
 					new File(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH),
 					new String[] {
-							CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH,
-							"localhost",
-							String.valueOf(port),
 							checkpointDir.toURI().toString(),
 							outputDir.toURI().toString()
 					});
 
+			TestStreamEnvironment.setAsContext(
+				testCluster,
+				parallelism,
+				Collections.singleton(new Path(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH)),
+				Collections.<URL>emptyList());
+
 			program.invokeInteractiveModeForExecution();
 
 		} catch (Exception e) {
 			if (!(e.getCause().getCause() instanceof SuccessException)) {
-				fail(e.getMessage());
+				throw e;
 			}
 		}
 	}
@@ -231,23 +255,25 @@ public class ClassLoaderITCase extends TestLogger {
 	public void testDisposeSavepointWithCustomKvState() throws Exception {
 		Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow();
 
-		int port = testCluster.getLeaderRPCPort();
-
 		File checkpointDir = FOLDER.newFolder();
 		File outputDir = FOLDER.newFolder();
 
 		final PackagedProgram program = new PackagedProgram(
 				new File(CUSTOM_KV_STATE_JAR_PATH),
 				new String[] {
-						CUSTOM_KV_STATE_JAR_PATH,
-						"localhost",
-						String.valueOf(port),
 						String.valueOf(parallelism),
 						checkpointDir.toURI().toString(),
 						"5000",
 						outputDir.toURI().toString()
 				});
 
+		TestStreamEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(CUSTOM_KV_STATE_JAR_PATH)),
+			Collections.<URL>emptyList()
+		);
+
 		// Execute detached
 		Thread invokeThread = new Thread(new Runnable() {
 			@Override
@@ -283,7 +309,7 @@ public class ClassLoaderITCase extends TestLogger {
 
 			// Retry if job is not available yet
 			if (jobId == null) {
-				Thread.sleep(100);
+				Thread.sleep(100L);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
index 52a3ba8..795ae41 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
@@ -41,14 +41,11 @@ public class CheckpointedStreamingProgram {
 	private static final int CHECKPOINT_INTERVALL = 100;
 	
 	public static void main(String[] args) throws Exception {
-		final String jarFile = args[0];
-		final String host = args[1];
-		final int port = Integer.parseInt(args[2]);
-		
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
 		env.getConfig().disableSysoutLogging();
 		env.enableCheckpointing(CHECKPOINT_INTERVALL);
-		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10000));
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 100L));
 		env.disableOperatorChaining();
 		
 		DataStream<String> text = env.addSource(new SimpleStringGenerator());

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
index d3baa7d..a24a3a8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
@@ -49,14 +49,11 @@ import java.util.concurrent.ThreadLocalRandom;
 public class CheckpointingCustomKvStateProgram {
 
 	public static void main(String[] args) throws Exception {
-		final String jarFile = args[0];
-		final String host = args[1];
-		final int port = Integer.parseInt(args[2]);
-		final String checkpointPath = args[3];
-		final String outputPath = args[4];
+		final String checkpointPath = args[0];
+		final String outputPath = args[1];
 		final int parallelism = 1;
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		env.setParallelism(parallelism);
 		env.getConfig().disableSysoutLogging();

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
index b18e8e8..2caa7cf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.classloading.jar;
 
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -30,7 +29,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.RemoteEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -42,14 +40,8 @@ import org.apache.flink.core.io.InputSplitAssigner;
 public class CustomInputSplitProgram {
 	
 	public static void main(String[] args) throws Exception {
-		final String[] jarFile = (args[0].equals(""))? null : new String[] { args[0] };
-		final URL[] classpath = (args[1].equals(""))? null : new URL[] { new URL(args[1]) };
-		final String host = args[2];
-		final int port = Integer.parseInt(args[3]);
-		final int parallelism = Integer.parseInt(args[4]);
-
-		RemoteEnvironment env = new RemoteEnvironment(host, port, null, jarFile, classpath);
-		env.setParallelism(parallelism);
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableSysoutLogging();
 
 		DataSet<Integer> data = env.createInput(new CustomInputFormat());

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
index 8de4797..cbd553c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
@@ -43,15 +43,12 @@ import java.util.concurrent.ThreadLocalRandom;
 public class CustomKvStateProgram {
 
 	public static void main(String[] args) throws Exception {
-		final String jarFile = args[0];
-		final String host = args[1];
-		final int port = Integer.parseInt(args[2]);
-		final int parallelism = Integer.parseInt(args[3]);
-		final String checkpointPath = args[4];
-		final int checkpointingInterval = Integer.parseInt(args[5]);
-		final String outputPath = args[6];
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+		final int parallelism = Integer.parseInt(args[0]);
+		final String checkpointPath = args[1];
+		final int checkpointingInterval = Integer.parseInt(args[2]);
+		final String outputPath = args[3];
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(parallelism);
 		env.getConfig().disableSysoutLogging();
 		env.enableCheckpointing(checkpointingInterval);

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java
index 785464a..b8e6c85 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java
@@ -52,22 +52,15 @@ public class KMeansForTest {
 	// *************************************************************************
 
 	public static void main(String[] args) throws Exception {
-		if (args.length < 7) {
+		if (args.length < 3) {
 			throw new IllegalArgumentException("Missing parameters");
 		}
 
-		final String jarFile = args[0];
-		final String host = args[1];
-		final int port = Integer.parseInt(args[2]);
+		final String pointsData = args[0];
+		final String centersData = args[1];
+		final int numIterations = Integer.parseInt(args[2]);
 
-		final int parallelism = Integer.parseInt(args[3]);
-
-		final String pointsData = args[4];
-		final String centersData = args[5];
-		final int numIterations = Integer.parseInt(args[6]);
-
-		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
-		env.setParallelism(parallelism);
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableSysoutLogging();
 
 		// get input data

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
index 60253fa..210973f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
@@ -38,11 +38,7 @@ public class LegacyCheckpointedStreamingProgram {
 	private static final int CHECKPOINT_INTERVALL = 100;
 
 	public static void main(String[] args) throws Exception {
-		final String jarFile = args[0];
-		final String host = args[1];
-		final int port = Integer.parseInt(args[2]);
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableSysoutLogging();
 		env.enableCheckpointing(CHECKPOINT_INTERVALL);
 		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10000));

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
index 0f0ee0c..e7bd522 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
@@ -42,18 +42,13 @@ import java.util.List;
 public class StreamingCustomInputSplitProgram {
 	
 	public static void main(String[] args) throws Exception {
-		final String jarFile = args[0];
-		final String host = args[1];
-		final int port = Integer.parseInt(args[2]);
-		final int parallelism = Integer.parseInt(args[3]);
-
-		Configuration config = new Configuration();
+				Configuration config = new Configuration();
 
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s");
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, config, jarFile);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
 		env.getConfig().disableSysoutLogging();
-		env.setParallelism(parallelism);
 
 		DataStream<Integer> data = env.createInput(new CustomInputFormat());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
index a19d8f2..0fdc744 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
@@ -31,12 +31,7 @@ import org.apache.flink.util.Collector;
 public class StreamingProgram {
 	
 	public static void main(String[] args) throws Exception {
-		
-		final String jarFile = args[0];
-		final String host = args[1];
-		final int port = Integer.parseInt(args[2]);
-		
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableSysoutLogging();
 		
 		DataStream<String> text = env.fromElements(WordCountData.TEXT).rebalance();

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
index a073cba..f12fd5f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
@@ -43,11 +43,7 @@ public class UserCodeType {
 	}
 
 	public static void main(String[] args) throws Exception {
-		String jarFile = args[0];
-		String host = args[1];
-		int port = Integer.parseInt(args[2]);
-
-		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableSysoutLogging();
 
 		DataSet<Integer> input = env.fromElements(1,2,3,4,5);

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
index a74ed34..61595f2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
@@ -53,7 +53,8 @@ public class JobRetrievalITCase extends TestLogger {
 
 	@BeforeClass
 	public static void before() {
-		cluster = new TestingCluster(new Configuration(), false);
+		Configuration configuration = new Configuration();
+		cluster = new TestingCluster(configuration, false);
 		cluster.start();
 	}
 
@@ -72,7 +73,7 @@ public class JobRetrievalITCase extends TestLogger {
 
 		final JobGraph jobGraph = new JobGraph(jobID, "testjob", imalock);
 
-		final ClusterClient client = new StandaloneClusterClient(cluster.configuration());
+		final ClusterClient client = new StandaloneClusterClient(cluster.configuration(), cluster.highAvailabilityServices());
 
 		// acquire the lock to make sure that the job cannot complete until the job client
 		// has been attached in resumingThread
@@ -122,7 +123,7 @@ public class JobRetrievalITCase extends TestLogger {
 		try {
 			client.retrieveJob(jobID);
 			fail();
-		} catch (JobRetrievalException e) {
+		} catch (JobRetrievalException ignored) {
 			// this is what we want
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
index 133ebd0..da92c05 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
@@ -66,7 +66,7 @@ public class CustomDistributionITCase extends TestLogger {
 
 	@Before
 	public void prepare() {
-		TestEnvironment clusterEnv = new TestEnvironment(cluster, 1);
+		TestEnvironment clusterEnv = new TestEnvironment(cluster, 1, false);
 		clusterEnv.setAsContext();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
index d9d1b42..0091571 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.StandaloneMiniCluster;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
@@ -39,47 +39,35 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 
 @SuppressWarnings("serial")
 public class RemoteEnvironmentITCase extends TestLogger {
 
 	private static final int TM_SLOTS = 4;
 
-	private static final int NUM_TM = 1;
-
 	private static final int USER_DOP = 2;
 
 	private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
 
 	private static final String VALID_STARTUP_TIMEOUT = "100 s";
 
-	private static LocalFlinkMiniCluster cluster;
+	private static Configuration configuration;
+
+	private static StandaloneMiniCluster cluster;
+
 
 	@BeforeClass
-	public static void setupCluster() {
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
-			cluster = new LocalFlinkMiniCluster(config, false);
-			cluster.start();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Error starting test cluster: " + e.getMessage());
-		}
+	public static void setupCluster() throws Exception {
+		configuration = new Configuration();
+
+		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
+
+		cluster = new StandaloneMiniCluster(configuration);
 	}
 
 	@AfterClass
-	public static void tearDownCluster() {
-		try {
-			cluster.stop();
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("ClusterClient shutdown caused an exception: " + t.getMessage());
-		}
+	public static void tearDownCluster() throws Exception {
+		cluster.close();
 	}
 
 	/**
@@ -91,8 +79,8 @@ public class RemoteEnvironmentITCase extends TestLogger {
 		config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
 
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				cluster.hostname(),
-				cluster.getLeaderRPCPort(),
+				cluster.getHostname(),
+				cluster.getPort(),
 				config
 		);
 		env.getConfig().disableSysoutLogging();
@@ -116,8 +104,8 @@ public class RemoteEnvironmentITCase extends TestLogger {
 		config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
 
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				cluster.hostname(),
-				cluster.getLeaderRPCPort(),
+				cluster.getHostname(),
+				cluster.getPort(),
 				config
 		);
 		env.setParallelism(USER_DOP);

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index 2910f06..eea2509 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
@@ -52,6 +53,8 @@ public class AutoParallelismITCase extends TestLogger {
 
 	private static LocalFlinkMiniCluster cluster;
 
+	private static TestEnvironment env;
+
 	@BeforeClass
 	public static void setupCluster() {
 		Configuration config = new Configuration();
@@ -60,6 +63,8 @@ public class AutoParallelismITCase extends TestLogger {
 		cluster = new LocalFlinkMiniCluster(config, false);
 
 		cluster.start();
+
+		env = new TestEnvironment(cluster, NUM_TM * SLOTS_PER_TM, false);
 	}
 
 	@AfterClass
@@ -78,9 +83,6 @@ public class AutoParallelismITCase extends TestLogger {
 	@Test
 	public void testProgramWithAutoParallelism() {
 		try {
-			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getLeaderRPCPort());
-
 			env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
 			env.getConfig().disableSysoutLogging();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
index b4a7f99..76480ba 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.test.misc;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
@@ -27,7 +26,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.types.Value;
 
 import org.apache.flink.util.TestLogger;
@@ -47,39 +48,28 @@ public class CustomSerializationITCase extends TestLogger {
 	
 	private static LocalFlinkMiniCluster cluster;
 
+	private static TestEnvironment env;
+
 	@BeforeClass
 	public static void startCluster() {
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARLLELISM);
-			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 30L);
-			cluster = new LocalFlinkMiniCluster(config, false);
-			cluster.start();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to start test cluster: " + e.getMessage());
-		}
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARLLELISM);
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 30L);
+		cluster = new LocalFlinkMiniCluster(config, false);
+		cluster.start();
+
+		env = new TestEnvironment(cluster, PARLLELISM, false);
 	}
 
 	@AfterClass
 	public static void shutdownCluster() {
-		try {
-			cluster.shutdown();
-			cluster = null;
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to stop test cluster: " + e.getMessage());
-		}
+		cluster.shutdown();
+		cluster = null;
 	}
 	
 	@Test
 	public void testIncorrectSerializer1() {
 		try {
-			ExecutionEnvironment env =
-					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-			
 			env.setParallelism(PARLLELISM);
 			env.getConfig().disableSysoutLogging();
 			
@@ -96,8 +86,8 @@ public class CustomSerializationITCase extends TestLogger {
 			
 			env.execute();
 		}
-		catch (ProgramInvocationException e) {
-			Throwable rootCause = e.getCause().getCause();
+		catch (JobExecutionException e) {
+			Throwable rootCause = e.getCause();
 			assertTrue(rootCause instanceof IOException);
 			assertTrue(rootCause.getMessage().contains("broken serialization"));
 		}
@@ -110,9 +100,6 @@ public class CustomSerializationITCase extends TestLogger {
 	@Test
 	public void testIncorrectSerializer2() {
 		try {
-			ExecutionEnvironment env =
-					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-
 			env.setParallelism(PARLLELISM);
 			env.getConfig().disableSysoutLogging();
 
@@ -129,8 +116,8 @@ public class CustomSerializationITCase extends TestLogger {
 
 			env.execute();
 		}
-		catch (ProgramInvocationException e) {
-			Throwable rootCause = e.getCause().getCause();
+		catch (JobExecutionException e) {
+			Throwable rootCause = e.getCause();
 			assertTrue(rootCause instanceof IOException);
 			assertTrue(rootCause.getMessage().contains("broken serialization"));
 		}
@@ -143,9 +130,6 @@ public class CustomSerializationITCase extends TestLogger {
 	@Test
 	public void testIncorrectSerializer3() {
 		try {
-			ExecutionEnvironment env =
-					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-
 			env.setParallelism(PARLLELISM);
 			env.getConfig().disableSysoutLogging();
 
@@ -162,8 +146,8 @@ public class CustomSerializationITCase extends TestLogger {
 
 			env.execute();
 		}
-		catch (ProgramInvocationException e) {
-			Throwable rootCause = e.getCause().getCause();
+		catch (JobExecutionException e) {
+			Throwable rootCause = e.getCause();
 			assertTrue(rootCause instanceof IOException);
 			assertTrue(rootCause.getMessage().contains("broken serialization"));
 		}
@@ -176,9 +160,6 @@ public class CustomSerializationITCase extends TestLogger {
 	@Test
 	public void testIncorrectSerializer4() {
 		try {
-			ExecutionEnvironment env =
-					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-
 			env.setParallelism(PARLLELISM);
 			env.getConfig().disableSysoutLogging();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
index f885321..7dab0f1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -23,15 +23,15 @@ import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
 
 import org.apache.flink.util.TestLogger;
@@ -54,43 +54,34 @@ import static org.junit.Assert.*;
 @SuppressWarnings("serial")
 public class MiscellaneousIssuesITCase extends TestLogger {
 
+	private static final int PARALLELISM = 6;
+
 	private static LocalFlinkMiniCluster cluster;
+
+	private static TestEnvironment env;
 	
 	@BeforeClass
 	public static void startCluster() {
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
-			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
-			cluster = new LocalFlinkMiniCluster(config, false);
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
+		cluster = new LocalFlinkMiniCluster(config, false);
 
-			cluster.start();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to start test cluster: " + e.getMessage());
-		}
+		cluster.start();
+
+		env = new TestEnvironment(cluster, PARALLELISM, false);
 	}
 	
 	@AfterClass
 	public static void shutdownCluster() {
-		try {
-			cluster.shutdown();
-			cluster = null;
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to stop test cluster: " + e.getMessage());
-		}
+		cluster.shutdown();
+		cluster = null;
 	}
 	
 	@Test
 	public void testNullValues() {
 		try {
-			ExecutionEnvironment env =
-					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-
 			env.setParallelism(1);
 			env.getConfig().disableSysoutLogging();
 
@@ -107,10 +98,9 @@ public class MiscellaneousIssuesITCase extends TestLogger {
 				env.execute();
 				fail("this should fail due to null values.");
 			}
-			catch (ProgramInvocationException e) {
+			catch (JobExecutionException e) {
 				assertNotNull(e.getCause());
-				assertNotNull(e.getCause().getCause());
-				assertTrue(e.getCause().getCause() instanceof NullPointerException);
+				assertTrue(e.getCause() instanceof NullPointerException);
 			}
 		}
 		catch (Exception e) {
@@ -122,9 +112,6 @@ public class MiscellaneousIssuesITCase extends TestLogger {
 	@Test
 	public void testDisjointDataflows() {
 		try {
-			ExecutionEnvironment env =
-					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-
 			env.setParallelism(5);
 			env.getConfig().disableSysoutLogging();
 
@@ -145,9 +132,6 @@ public class MiscellaneousIssuesITCase extends TestLogger {
 		final String ACC_NAME = "test_accumulator";
 		
 		try {
-			ExecutionEnvironment env =
-					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-
 			env.setParallelism(6);
 			env.getConfig().disableSysoutLogging();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index 4f24452..a5103cc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -34,14 +33,17 @@ import org.apache.flink.examples.java.clustering.util.KMeansData;
 import org.apache.flink.examples.java.graph.ConnectedComponents;
 import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
 
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
 
 public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
-	
+
+	private static final int PARALLELISM = 16;
 	@Test
 	public void testSuccessfulProgramAfterFailure() {
 		LocalFlinkMiniCluster cluster = null;
@@ -56,9 +58,11 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
 			cluster = new LocalFlinkMiniCluster(config, false);
 
 			cluster.start();
+
+			TestEnvironment env = new TestEnvironment(cluster, PARALLELISM, false);
 			
 			try {
-				runConnectedComponents(cluster.getLeaderRPCPort());
+				runConnectedComponents(env);
 			}
 			catch (Exception e) {
 				e.printStackTrace();
@@ -66,15 +70,15 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
 			}
 	
 			try {
-				runKMeans(cluster.getLeaderRPCPort());
+				runKMeans(env);
 				fail("This program execution should have failed.");
 			}
-			catch (ProgramInvocationException e) {
-				assertTrue(e.getCause().getCause().getMessage().contains("Insufficient number of network buffers"));
+			catch (JobExecutionException e) {
+				assertTrue(e.getCause().getMessage().contains("Insufficient number of network buffers"));
 			}
 	
 			try {
-				runConnectedComponents(cluster.getLeaderRPCPort());
+				runConnectedComponents(env);
 			}
 			catch (Exception e) {
 				e.printStackTrace();
@@ -92,10 +96,9 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
 		}
 	}
 	
-	private static void runConnectedComponents(int jmPort) throws Exception {
+	private static void runConnectedComponents(ExecutionEnvironment env) throws Exception {
 		
-		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jmPort);
-		env.setParallelism(16);
+		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
 		// read vertex and edge data
@@ -134,10 +137,9 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
 		env.execute();
 	}
 
-	private static void runKMeans(int jmPort) throws Exception {
+	private static void runKMeans(ExecutionEnvironment env) throws Exception {
 
-		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jmPort);
-		env.setParallelism(16);
+		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
 		// get input data

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
index 3c8eb48..6c8e758 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
@@ -170,7 +170,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final int numKeys = 256;
 
-		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+		final QueryableStateClient client = new QueryableStateClient(
+			cluster.configuration(),
+			cluster.highAvailabilityServices());
 
 		JobID jobId = null;
 
@@ -396,7 +398,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 
 		final int numElements = 1024;
 
-		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+		final QueryableStateClient client = new QueryableStateClient(
+			cluster.configuration(),
+			cluster.highAvailabilityServices());
 
 		JobID jobId = null;
 		try {
@@ -461,7 +465,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 
 		final int numElements = 1024;
 
-		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+		final QueryableStateClient client = new QueryableStateClient(
+			cluster.configuration(),
+			cluster.highAvailabilityServices());
 
 		JobID jobId = null;
 		try {
@@ -586,7 +592,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 
 		final int numElements = 1024;
 
-		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+		final QueryableStateClient client = new QueryableStateClient(
+			cluster.configuration(),
+			cluster.highAvailabilityServices());
 
 		JobID jobId = null;
 		try {
@@ -679,7 +687,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 
 		final int numElements = 1024;
 
-		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+		final QueryableStateClient client = new QueryableStateClient(
+			cluster.configuration(),
+			cluster.highAvailabilityServices());
 
 		JobID jobId = null;
 		try {
@@ -743,7 +753,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 
 		final int numElements = 1024;
 
-		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+		final QueryableStateClient client = new QueryableStateClient(
+			cluster.configuration(),
+			cluster.highAvailabilityServices());
 
 		JobID jobId = null;
 		try {
@@ -844,7 +856,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 
 		final int numElements = 1024;
 
-		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+		final QueryableStateClient client = new QueryableStateClient(
+			cluster.configuration(),
+			cluster.highAvailabilityServices());
 
 		JobID jobId = null;
 		try {
@@ -1008,6 +1022,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 	 */
 	private static class TestKeyRangeSource extends RichParallelSourceFunction<Tuple2<Integer, Long>>
 			implements CheckpointListener {
+		private static final long serialVersionUID = -5744725196953582710L;
 
 		private final static AtomicLong LATEST_CHECKPOINT_ID = new AtomicLong();
 		private final int numKeys;
@@ -1055,6 +1070,8 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 	}
 
 	private static class SumFold implements FoldFunction<Tuple2<Integer, Long>, String> {
+		private static final long serialVersionUID = -6249227626701264599L;
+
 		@Override
 		public String fold(String accumulator, Tuple2<Integer, Long> value) throws Exception {
 			long acc = Long.valueOf(accumulator);
@@ -1064,6 +1081,8 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 	}
 
 	private static class SumReduce implements ReduceFunction<Tuple2<Integer, Long>> {
+		private static final long serialVersionUID = -8651235077342052336L;
+
 		@Override
 		public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
 			value1.f1 += value2.f1;

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 27d1aa1..c7c07ce 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -26,9 +26,12 @@ import akka.util.Timeout;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -86,13 +89,14 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 	protected static final int PARALLELISM = 4;
 
 	@Test
-	public void testTaskManagerProcessFailure() {
+	public void testTaskManagerProcessFailure() throws Exception {
 
 		final StringWriter processOutput1 = new StringWriter();
 		final StringWriter processOutput2 = new StringWriter();
 		final StringWriter processOutput3 = new StringWriter();
 
 		ActorSystem jmActorSystem = null;
+		HighAvailabilityServices highAvailabilityServices = null;
 		Process taskManagerProcess1 = null;
 		Process taskManagerProcess2 = null;
 		Process taskManagerProcess3 = null;
@@ -128,6 +132,13 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 			jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
 			jmConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "10 s");
 			jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
+			jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1());
+			jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
+
+			highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+				jmConfig,
+				TestingUtils.defaultExecutor(),
+				HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
 
 			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<>(localAddress));
 			ActorRef jmActor = JobManager.startJobManagerActors(
@@ -135,6 +146,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 				jmActorSystem,
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
+				highAvailabilityServices,
 				JobManager.class,
 				MemoryArchivist.class)._1();
 
@@ -263,6 +275,10 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 					// we can ignore this
 				}
 			}
+
+			if (highAvailabilityServices != null) {
+				highAvailabilityServices.closeAndCleanupAllData();
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
index bba218f..6d53b9f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -27,6 +27,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -34,13 +36,13 @@ import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerProcess;
 import org.apache.flink.runtime.testutils.TaskManagerProcess;
 import org.apache.flink.runtime.testutils.TestJvmProcess;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -169,6 +171,11 @@ public class ChaosMonkeyITCase extends TestLogger {
 		// Task manager
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numberOfSlotsPerTaskManager);
 
+		final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+			config,
+			TestingUtils.defaultExecutor(),
+			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+
 		ActorSystem testActorSystem = null;
 		LeaderRetrievalService leaderRetrievalService = null;
 		List<JobManagerProcess> jobManagerProcesses = new ArrayList<>();
@@ -187,7 +194,7 @@ public class ChaosMonkeyITCase extends TestLogger {
 			testActorSystem = AkkaUtils.createDefaultActorSystem();
 
 			// Leader listener
-			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(config);
+			leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
 			TestingListener leaderListener = new TestingListener();
 			leaderRetrievalService.start(leaderListener);
 
@@ -358,6 +365,8 @@ public class ChaosMonkeyITCase extends TestLogger {
 			if (testActorSystem != null) {
 				testActorSystem.shutdown();
 			}
+
+			highAvailabilityServices.closeAndCleanupAllData();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
index d80c826..0b49814 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -45,18 +46,19 @@ public class FastFailuresITCase extends TestLogger {
 	
 	@Test
 	public void testThis() {
+		final int parallelism = 4;
+
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
 		
 		LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();
-		
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
 
+		TestStreamEnvironment env = new TestStreamEnvironment(cluster, parallelism);
+		
 		env.getConfig().disableSysoutLogging();
-		env.setParallelism(4);
+		env.setParallelism(parallelism);
 		env.enableCheckpointing(1000);
 		env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(210, 0));
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index f910e49..6c70b87 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -26,6 +26,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -34,12 +36,12 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerProcess;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -170,6 +172,10 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 		final JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
 		LeaderRetrievalService leaderRetrievalService = null;
 		ActorSystem taskManagerSystem = null;
+		final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+			config,
+			TestingUtils.defaultExecutor(),
+			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
 
 		try {
 			final Deadline deadline = TestTimeOut.fromNow();
@@ -187,16 +193,21 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 
 			// Leader listener
 			TestingListener leaderListener = new TestingListener();
-			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(config);
+			leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
 			leaderRetrievalService.start(leaderListener);
 
 			// The task manager
 			taskManagerSystem = AkkaUtils.createActorSystem(
 					config, Option.apply(new Tuple2<String, Object>("localhost", 0)));
 			TaskManager.startTaskManagerComponentsAndActor(
-					config, ResourceID.generate(), taskManagerSystem, "localhost",
-					Option.<String>empty(), Option.<LeaderRetrievalService>empty(),
-					false, TaskManager.class);
+				config,
+				ResourceID.generate(),
+				taskManagerSystem,
+				highAvailabilityServices,
+				"localhost",
+				Option.<String>empty(),
+				false,
+				TaskManager.class);
 
 			{
 				// Initial submission
@@ -298,6 +309,8 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 			if (testSystem != null) {
 				testSystem.shutdown();
 			}
+
+			highAvailabilityServices.closeAndCleanupAllData();
 		}
 	}
 
@@ -323,6 +336,10 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 		LeaderRetrievalService leaderRetrievalService = null;
 		ActorSystem taskManagerSystem = null;
 		ActorSystem testActorSystem = null;
+		final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+			config,
+			TestingUtils.defaultExecutor(),
+			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
 
 		try {
 			// Test actor system
@@ -338,16 +355,21 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 
 			// Leader listener
 			TestingListener leaderListener = new TestingListener();
-			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(config);
+			leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
 			leaderRetrievalService.start(leaderListener);
 
 			// The task manager
 			taskManagerSystem = AkkaUtils.createActorSystem(
 					config, Option.apply(new Tuple2<String, Object>("localhost", 0)));
 			TaskManager.startTaskManagerComponentsAndActor(
-					config, ResourceID.generate(), taskManagerSystem, "localhost",
-					Option.<String>empty(), Option.<LeaderRetrievalService>empty(),
-					false, TaskManager.class);
+				config,
+				ResourceID.generate(),
+				taskManagerSystem,
+				highAvailabilityServices,
+				"localhost",
+				Option.<String>empty(),
+				false,
+				TaskManager.class);
 
 			// Get the leader
 			leaderListener.waitForNewLeader(testDeadline.timeLeft().toMillis());
@@ -453,6 +475,8 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 			if (testActorSystem != null) {
 				testActorSystem.shutdown();
 			}
+
+			highAvailabilityServices.closeAndCleanupAllData();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index e4d0f65..052195a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -32,6 +32,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -45,12 +47,12 @@ import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
 import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerProcess;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.util.TestLogger;
 import org.apache.zookeeper.data.Stat;
@@ -183,6 +185,11 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 
 		ActorSystem taskManagerSystem = null;
 
+		final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+			config,
+			TestingUtils.defaultExecutor(),
+			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+
 		try {
 			final Deadline deadline = TestTimeOut.fromNow();
 
@@ -199,15 +206,20 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 
 			// Leader listener
 			TestingListener leaderListener = new TestingListener();
-			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(config);
+			leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
 			leaderRetrievalService.start(leaderListener);
 
 			// The task manager
 			taskManagerSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
 			TaskManager.startTaskManagerComponentsAndActor(
-					config, ResourceID.generate(), taskManagerSystem, "localhost",
-					Option.<String>empty(), Option.<LeaderRetrievalService>empty(),
-					false, TaskManager.class);
+				config,
+				ResourceID.generate(),
+				taskManagerSystem,
+				highAvailabilityServices,
+				"localhost",
+				Option.<String>empty(),
+				false,
+				TaskManager.class);
 
 			// Client test actor
 			TestActorRef<RecordingTestClient> clientRef = TestActorRef.create(
@@ -327,6 +339,8 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 			if (testSystem != null) {
 				testSystem.shutdown();
 			}
+
+			highAvailabilityServices.closeAndCleanupAllData();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index b6a1bd4..5f9d178 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -34,16 +34,18 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerProcess;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -238,7 +240,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 		// Task managers
 		final ActorSystem[] tmActorSystem = new ActorSystem[numberOfTaskManagers];
 
-		// Leader election service
+		HighAvailabilityServices highAvailabilityServices = null;
+
 		LeaderRetrievalService leaderRetrievalService = null;
 
 		// Coordination between the processes goes through a directory
@@ -263,13 +266,22 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
 
+			highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+				config,
+				TestingUtils.defaultExecutor());
+
 			// Start the task manager process
 			for (int i = 0; i < numberOfTaskManagers; i++) {
 				tmActorSystem[i] = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
 				TaskManager.startTaskManagerComponentsAndActor(
-						config, ResourceID.generate(), tmActorSystem[i], "localhost",
-						Option.<String>empty(), Option.<LeaderRetrievalService>empty(),
-						false, TaskManager.class);
+					config,
+					ResourceID.generate(),
+					tmActorSystem[i],
+					highAvailabilityServices,
+					"localhost",
+					Option.<String>empty(),
+					false,
+					TaskManager.class);
 			}
 
 			// Test actor system
@@ -279,7 +291,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 
 			// Leader listener
 			TestingListener leaderListener = new TestingListener();
-			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(config);
+			leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
 			leaderRetrievalService.start(leaderListener);
 
 			// Initial submission
@@ -378,6 +390,10 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 				}
 			}
 
+			if (highAvailabilityServices != null) {
+				highAvailabilityServices.closeAndCleanupAllData();
+			}
+
 			// Delete coordination directory
 			if (coordinateTempDir != null) {
 				try {