You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/08 15:17:47 UTC

[2/4] flink git commit: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFlinkMiniCluster

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index 934a795..6abea2a 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -27,11 +27,11 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,7 +92,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 
-		ForkableFlinkMiniCluster flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+		LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false);
 		flink.start();
 
 		final int flinkPort = flink.getLeaderRPCPort();

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
index ee415d1..29b3a3e 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
@@ -18,8 +18,9 @@
 
 package org.apache.flink.streaming.api.scala
 
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.streaming.util.TestStreamEnvironment
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
+import org.apache.flink.test.util.TestBaseUtils
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.junit.JUnitSuiteLike
 
@@ -29,7 +30,7 @@ trait ScalaStreamingMultipleProgramsTestBase
   with BeforeAndAfterAll {
 
   val parallelism = 4
-  var cluster: Option[ForkableFlinkMiniCluster] = None
+  var cluster: Option[LocalFlinkMiniCluster] = None
 
   override protected def beforeAll(): Unit = {
     val cluster = Some(

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml b/flink-test-utils-parent/flink-test-utils/pom.xml
index 2ab52b5..18ecfde 100644
--- a/flink-test-utils-parent/flink-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-test-utils/pom.xml
@@ -79,153 +79,4 @@ under the License.
 		</dependency>
 
 	</dependencies>
-
-	<build>
-		<plugins>
-			<!-- Scala Compiler -->
-			<plugin>
-				<groupId>net.alchim31.maven</groupId>
-				<artifactId>scala-maven-plugin</artifactId>
-				<version>3.1.4</version>
-				<executions>
-					<!-- Run scala compiler in the process-resources phase, so that dependencies 
-						on scala classes can be resolved later in the (Java) compile phase -->
-					<execution>
-						<id>scala-compile-first</id>
-						<phase>process-resources</phase>
-						<goals>
-							<goal>compile</goal>
-						</goals>
-					</execution>
-
-					<!-- Run scala compiler in the process-test-resources phase, so that 
-						dependencies on scala classes can be resolved later in the (Java) test-compile 
-						phase -->
-					<execution>
-						<id>scala-test-compile</id>
-						<phase>process-test-resources</phase>
-						<goals>
-							<goal>testCompile</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<jvmArgs>
-						<jvmArg>-Xms128m</jvmArg>
-						<jvmArg>-Xmx512m</jvmArg>
-					</jvmArgs>
-				</configuration>
-			</plugin>
-
-			<!-- Eclipse Integration -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-eclipse-plugin</artifactId>
-				<version>2.8</version>
-				<configuration>
-					<downloadSources>true</downloadSources>
-					<projectnatures>
-						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
-					</projectnatures>
-					<buildcommands>
-						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-					</buildcommands>
-					<classpathContainers>
-						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-					</classpathContainers>
-					<excludes>
-						<exclude>org.scala-lang:scala-library</exclude>
-						<exclude>org.scala-lang:scala-compiler</exclude>
-					</excludes>
-					<sourceIncludes>
-						<sourceInclude>**/*.scala</sourceInclude>
-						<sourceInclude>**/*.java</sourceInclude>
-					</sourceIncludes>
-				</configuration>
-			</plugin>
-
-			<!-- Adding scala source directories to build path -->
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>build-helper-maven-plugin</artifactId>
-				<version>1.7</version>
-				<executions>
-					<!-- Add src/main/scala to eclipse build path -->
-					<execution>
-						<id>add-source</id>
-						<phase>generate-sources</phase>
-						<goals>
-							<goal>add-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/main/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-					<!-- Add src/test/scala to eclipse build path -->
-					<execution>
-						<id>add-test-source</id>
-						<phase>generate-test-sources</phase>
-						<goals>
-							<goal>add-test-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/test/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<!-- Scala Code Style, most of the configuration done via plugin management -->
-			<plugin>
-				<groupId>org.scalastyle</groupId>
-				<artifactId>scalastyle-maven-plugin</artifactId>
-				<configuration>
-					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
-				</configuration>
-			</plugin>
-
-		</plugins>
-		<pluginManagement>
-			<plugins>
-				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
-				<plugin>
-					<groupId>org.eclipse.m2e</groupId>
-					<artifactId>lifecycle-mapping</artifactId>
-					<version>1.0.0</version>
-					<configuration>
-						<lifecycleMappingMetadata>
-							<pluginExecutions>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>
-											net.alchim31.maven
-										</groupId>
-										<artifactId>
-											scala-maven-plugin
-										</artifactId>
-										<versionRange>
-											[3.1.4,)
-										</versionRange>
-										<goals>
-											<goal>compile</goal>
-											<goal>testCompile</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore/>
-									</action>
-								</pluginExecution>
-							</pluginExecutions>
-						</lifecycleMappingMetadata>
-					</configuration>
-				</plugin>
-			</plugins>
-		</pluginManagement>
-	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index c5fbaf0..a478908 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -19,8 +19,8 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.TestBaseUtils;
 
 import org.junit.AfterClass;
@@ -61,7 +61,7 @@ public class StreamingMultipleProgramsTestBase extends AbstractTestBase {
 
 	protected static final int DEFAULT_PARALLELISM = 4;
 
-	protected static ForkableFlinkMiniCluster cluster;
+	protected static LocalFlinkMiniCluster cluster;
 
 	public StreamingMultipleProgramsTestBase() {
 		super(new Configuration());

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index c700102..64c68dc 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -20,10 +20,10 @@ package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
 import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -32,10 +32,10 @@ import org.apache.flink.util.Preconditions;
 public class TestStreamEnvironment extends StreamExecutionEnvironment {
 	
 	/** The mini cluster in which this environment executes its jobs */
-	private ForkableFlinkMiniCluster executor;
+	private LocalFlinkMiniCluster executor;
 	
 
-	public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
+	public TestStreamEnvironment(LocalFlinkMiniCluster executor, int parallelism) {
 		this.executor = Preconditions.checkNotNull(executor);
 		setParallelism(parallelism);
 	}
@@ -57,7 +57,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 	 * @param cluster The test cluster to run the test program on.
 	 * @param parallelism The default parallelism for the test programs.
 	 */
-	public static void setAsContext(final ForkableFlinkMiniCluster cluster, final int parallelism) {
+	public static void setAsContext(final LocalFlinkMiniCluster cluster, final int parallelism) {
 		
 		StreamExecutionEnvironmentFactory factory = new StreamExecutionEnvironmentFactory() {
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index c2da691..316fd21 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.util;
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
@@ -48,7 +49,7 @@ public abstract class AbstractTestBase extends TestBaseUtils {
 	protected int numTaskManagers = 1;
 	
 	/** The mini cluster that runs the test programs */
-	protected ForkableFlinkMiniCluster executor;
+	protected LocalFlinkMiniCluster executor;
 	
 
 	public AbstractTestBase(Configuration config) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index d7f09bd..4e83245 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.util;
 
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.runners.Parameterized;
@@ -72,7 +73,7 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
 
 	protected static boolean startWebServer = false;
 
-	protected static ForkableFlinkMiniCluster cluster = null;
+	protected static LocalFlinkMiniCluster cluster = null;
 	
 	// ------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 4014b80..b774f97 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -32,7 +32,8 @@ import org.apache.commons.io.IOUtils;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -104,7 +105,7 @@ public class TestBaseUtils extends TestLogger {
 	}
 	
 	
-	public static ForkableFlinkMiniCluster startCluster(
+	public static LocalFlinkMiniCluster startCluster(
 		int numTaskManagers,
 		int taskManagerNumSlots,
 		boolean startWebserver,
@@ -126,7 +127,7 @@ public class TestBaseUtils extends TestLogger {
 		return startCluster(config, singleActorSystem);
 	}
 
-	public static ForkableFlinkMiniCluster startCluster(
+	public static LocalFlinkMiniCluster startCluster(
 		Configuration config,
 		boolean singleActorSystem) throws Exception {
 
@@ -147,7 +148,7 @@ public class TestBaseUtils extends TestLogger {
 		
 		config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.toString());
 
-		ForkableFlinkMiniCluster cluster =  new ForkableFlinkMiniCluster(config, singleActorSystem);
+		LocalFlinkMiniCluster cluster =  new LocalFlinkMiniCluster(config, singleActorSystem);
 
 		cluster.start();
 
@@ -155,7 +156,7 @@ public class TestBaseUtils extends TestLogger {
 	}
 
 
-	public static void stopCluster(ForkableFlinkMiniCluster executor, FiniteDuration timeout) throws Exception {
+	public static void stopCluster(LocalFlinkMiniCluster executor, FiniteDuration timeout) throws Exception {
 		if (logDir != null) {
 			FileUtils.deleteDirectory(logDir);
 		}
@@ -169,11 +170,15 @@ public class TestBaseUtils extends TestLogger {
 				List<Future<Object>> numActiveConnectionsResponseFutures = new ArrayList<>();
 
 				for (ActorRef tm : tms) {
-					bcVariableManagerResponseFutures.add(Patterns.ask(tm, TestingTaskManagerMessages
-							.RequestBroadcastVariablesWithReferences$.MODULE$, new Timeout(timeout)));
-
-					numActiveConnectionsResponseFutures.add(Patterns.ask(tm, TestingTaskManagerMessages
-							.RequestNumActiveConnections$.MODULE$, new Timeout(timeout)));
+					bcVariableManagerResponseFutures.add(Patterns.ask(
+						tm,
+						TaskManagerMessages.getRequestBroadcastVariablesWithReferences(),
+						new Timeout(timeout)));
+
+					numActiveConnectionsResponseFutures.add(Patterns.ask(
+						tm,
+						TaskManagerMessages.getRequestNumActiveConnections(),
+						new Timeout(timeout)));
 				}
 
 				Future<Iterable<Object>> bcVariableManagerFutureResponses = Futures.sequence(
@@ -182,8 +187,7 @@ public class TestBaseUtils extends TestLogger {
 				Iterable<Object> responses = Await.result(bcVariableManagerFutureResponses, timeout);
 
 				for (Object response : responses) {
-					numUnreleasedBCVars += ((TestingTaskManagerMessages
-							.ResponseBroadcastVariablesWithReferences) response).number();
+					numUnreleasedBCVars += ((TaskManagerMessages.ResponseBroadcastVariablesWithReferences) response).number();
 				}
 
 				Future<Iterable<Object>> numActiveConnectionsFutureResponses = Futures.sequence(
@@ -192,8 +196,7 @@ public class TestBaseUtils extends TestLogger {
 				responses = Await.result(numActiveConnectionsFutureResponses, timeout);
 
 				for (Object response : responses) {
-					numActiveConnections += ((TestingTaskManagerMessages
-							.ResponseNumActiveConnections) response).number();
+					numActiveConnections += ((TaskManagerMessages.ResponseNumActiveConnections) response).number();
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index 7cb88be..aea8152 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -29,10 +29,11 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
 public class TestEnvironment extends ExecutionEnvironment {
 
-	private final ForkableFlinkMiniCluster executor;
+	private final LocalFlinkMiniCluster executor;
 
 	private TestEnvironment lastEnv = null;
 
@@ -46,7 +47,7 @@ public class TestEnvironment extends ExecutionEnvironment {
 		}
 	}
 
-	public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
+	public TestEnvironment(LocalFlinkMiniCluster executor, int parallelism) {
 		this.executor = executor;
 		setParallelism(parallelism);
 
@@ -54,7 +55,7 @@ public class TestEnvironment extends ExecutionEnvironment {
 		getConfig().setCodeAnalysisMode(CodeAnalysisMode.DISABLE);
 	}
 
-	public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism, boolean isObjectReuseEnabled) {
+	public TestEnvironment(LocalFlinkMiniCluster executor, int parallelism, boolean isObjectReuseEnabled) {
 		this(executor, parallelism);
 
 		if (isObjectReuseEnabled) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
deleted file mode 100644
index fa3135a..0000000
--- a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util
-
-import java.util.concurrent.TimeoutException
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.pattern.Patterns._
-import akka.pattern.ask
-
-import org.apache.curator.test.TestingCluster
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.jobmanager.{JobManager, HighAvailabilityMode}
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
-import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.testingUtils.{TestingJobManager, TestingMemoryArchivist, TestingTaskManager}
-import org.apache.flink.runtime.testutils.TestingResourceManager
-
-import scala.concurrent.{Await, Future}
-import scala.concurrent.duration._
-
-/**
- * A forkable mini cluster is a special case of the mini cluster, used for parallel test execution
- * on build servers. If multiple tests run in parallel, the cluster picks up the fork number and
- * uses it to avoid port conflicts.
- *
- * @param userConfiguration Configuration object with the user provided configuration values
- * @param singleActorSystem true, if all actors (JobManager and TaskManager) shall be run in the
- *                          same [[ActorSystem]], otherwise false.
- */
-class ForkableFlinkMiniCluster(
-    userConfiguration: Configuration,
-    singleActorSystem: Boolean)
-  extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) {
-
-  def this(userConfiguration: Configuration) = this(userConfiguration, true)
-
-  // --------------------------------------------------------------------------
-
-  var zookeeperCluster: Option[TestingCluster] = None
-
-  override def generateConfiguration(userConfiguration: Configuration): Configuration = {
-    val forkNumberString = System.getProperty("forkNumber")
-
-    val forkNumber = try {
-      Integer.parseInt(forkNumberString)
-    }
-    catch {
-      case e: NumberFormatException => -1
-    }
-
-    val config = userConfiguration.clone()
-
-    if (forkNumber != -1) {
-      val jobManagerRPC = 1024 + forkNumber*400
-      val taskManagerRPC = 1024 + forkNumber*400 + 100
-      val taskManagerData = 1024 + forkNumber*400 + 200
-      val resourceManagerRPC = 1024 + forkNumber*400 + 300
-
-      config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRPC)
-      config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, taskManagerRPC)
-      config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, taskManagerData)
-      config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerRPC)
-    }
-
-    super.generateConfiguration(config)
-  }
-
-  override def startJobManager(index: Int, actorSystem: ActorSystem): ActorRef = {
-    val config = configuration.clone()
-
-    val jobManagerName = getJobManagerName(index)
-    val archiveName = getArchiveName(index)
-
-    val jobManagerPort = config.getInteger(
-      ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
-
-    if (jobManagerPort > 0) {
-      config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
-    }
-
-    val (jobManager, _) = JobManager.startJobManagerActors(
-      config,
-      actorSystem,
-      Some(jobManagerName),
-      Some(archiveName),
-      classOf[TestingJobManager],
-      classOf[TestingMemoryArchivist])
-
-    jobManager
-  }
-
-  override def startResourceManager(index: Int, system: ActorSystem): ActorRef = {
-    val config = configuration.clone()
-
-    val resourceManagerName = getResourceManagerName(index)
-
-    val resourceManagerPort = config.getInteger(
-      ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
-
-    if (resourceManagerPort > 0) {
-      config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index)
-    }
-
-    val resourceManager = FlinkResourceManager.startResourceManagerActors(
-      config,
-      system,
-      createLeaderRetrievalService(),
-      classOf[TestingResourceManager],
-      resourceManagerName)
-
-    resourceManager
-  }
-
-  override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
-    val config = configuration.clone()
-
-    val rpcPort = config.getInteger(
-      ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
-
-    val dataPort = config.getInteger(
-      ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
-
-    if (rpcPort > 0) {
-      config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index)
-    }
-    if (dataPort > 0) {
-      config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index)
-    }
-
-    val localExecution = numTaskManagers == 1
-
-    TaskManager.startTaskManagerComponentsAndActor(
-      config,
-      ResourceID.generate(),
-      system,
-      hostname,
-      Some(TaskManager.TASK_MANAGER_NAME + index),
-      Some(createLeaderRetrievalService()),
-      localExecution,
-      classOf[TestingTaskManager])
-  }
-
-  def addTaskManager(): Unit = {
-    if (useSingleActorSystem) {
-      (jobManagerActorSystems, taskManagerActors) match {
-        case (Some(jmSystems), Some(tmActors)) =>
-          val index = numTaskManagers
-          taskManagerActors = Some(tmActors :+ startTaskManager(index, jmSystems(0)))
-          numTaskManagers += 1
-        case _ => throw new IllegalStateException("Cluster has not been started properly.")
-      }
-    } else {
-      (taskManagerActorSystems, taskManagerActors) match {
-        case (Some(tmSystems), Some(tmActors)) =>
-          val index = numTaskManagers
-          val newTmSystem = startTaskManagerActorSystem(index)
-          val newTmActor = startTaskManager(index, newTmSystem)
-
-          taskManagerActorSystems = Some(tmSystems :+ newTmSystem)
-          taskManagerActors = Some(tmActors :+ newTmActor)
-
-          numTaskManagers += 1
-        case _ => throw new IllegalStateException("Cluster has not been started properly.")
-      }
-    }
-  }
-
-  def restartLeadingJobManager(): Unit = {
-    this.synchronized {
-      (jobManagerActorSystems, jobManagerActors) match {
-        case (Some(jmActorSystems), Some(jmActors)) =>
-          val leader = getLeaderGateway(AkkaUtils.getTimeout(configuration))
-          val index = getLeaderIndex(AkkaUtils.getTimeout(configuration))
-
-          clearLeader()
-
-          val stopped = gracefulStop(leader.actor(), ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
-          Await.result(stopped, ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
-
-          if(!singleActorSystem) {
-            jmActorSystems(index).shutdown()
-            jmActorSystems(index).awaitTermination()
-          }
-
-          val newJobManagerActorSystem = if(!singleActorSystem) {
-            startJobManagerActorSystem(index)
-          } else {
-            jmActorSystems.head
-          }
-
-          val newJobManagerActor = startJobManager(index, newJobManagerActorSystem)
-
-          jobManagerActors = Some(jmActors.patch(index, Seq(newJobManagerActor), 1))
-          jobManagerActorSystems = Some(jmActorSystems.patch(
-            index,
-            Seq(newJobManagerActorSystem),
-            1))
-
-          val lrs = createLeaderRetrievalService()
-
-          jobManagerLeaderRetrievalService = Some(lrs)
-          lrs.start(this)
-
-        case _ => throw new Exception("The JobManager of the ForkableFlinkMiniCluster have not " +
-          "been started properly.")
-      }
-    }
-  }
-
-
-  def restartTaskManager(index: Int): Unit = {
-    (taskManagerActorSystems, taskManagerActors) match {
-      case (Some(tmActorSystems), Some(tmActors)) =>
-        val stopped = gracefulStop(tmActors(index), ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
-        Await.result(stopped, ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
-
-        if(!singleActorSystem) {
-          tmActorSystems(index).shutdown()
-          tmActorSystems(index).awaitTermination()
-        }
-
-        val taskManagerActorSystem  = if(!singleActorSystem) {
-          startTaskManagerActorSystem(index)
-        } else {
-          tmActorSystems.head
-        }
-
-        val taskManagerActor = startTaskManager(index, taskManagerActorSystem)
-
-        taskManagerActors = Some(tmActors.patch(index, Seq(taskManagerActor), 1))
-        taskManagerActorSystems = Some(tmActorSystems.patch(index, Seq(taskManagerActorSystem), 1))
-
-      case _ => throw new Exception("The TaskManager of the ForkableFlinkMiniCluster have not " +
-        "been started properly.")
-    }
-  }
-
-  override def start(): Unit = {
-    val zookeeperURL = configuration.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "")
-
-    zookeeperCluster = if (haMode == HighAvailabilityMode.ZOOKEEPER &&
-      zookeeperURL.equals("")) {
-      LOG.info("Starting ZooKeeper cluster.")
-
-      val testingCluster = new TestingCluster(1)
-
-      configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
-        testingCluster.getConnectString)
-
-      testingCluster.start()
-
-      Some(testingCluster)
-    } else {
-      None
-    }
-
-    super.start()
-  }
-
-  override def stop(): Unit = {
-    super.stop()
-
-    zookeeperCluster.foreach{
-      LOG.info("Stopping ZooKeeper cluster.")
-      _.close()
-    }
-  }
-
-  def waitForTaskManagersToBeRegisteredAtJobManager(jobManager: ActorRef): Unit = {
-    val futures = taskManagerActors.map {
-      _.map {
-        tm => (tm ? NotifyWhenRegisteredAtJobManager(jobManager))(timeout)
-      }
-    }.getOrElse(Seq())
-
-    try {
-      Await.ready(Future.sequence(futures), timeout)
-    } catch {
-      case t: TimeoutException =>
-        throw new Exception("Timeout while waiting for TaskManagers to register at " +
-          s"${jobManager.path}")
-    }
-
-  }
-}
-
-object ForkableFlinkMiniCluster {
-
-  val MAX_RESTART_DURATION = 2 minute
-
-  val DEFAULT_MINICLUSTER_AKKA_ASK_TIMEOUT = "200 s"
-
-  def startCluster(
-                    numSlots: Int,
-                    numTaskManagers: Int,
-                    timeout: String = DEFAULT_MINICLUSTER_AKKA_ASK_TIMEOUT)
-  : ForkableFlinkMiniCluster = {
-
-    val config = new Configuration()
-    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
-    config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers)
-    config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout)
-
-    val cluster = new ForkableFlinkMiniCluster(config)
-
-    cluster.start()
-
-    cluster
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
index cac8451..cc70fee 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -28,7 +28,7 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -44,7 +44,7 @@ import static org.junit.Assert.fail;
  */
 public class AccumulatorErrorITCase {
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 	@BeforeClass
 	public static void startCluster() {
@@ -53,7 +53,7 @@ public class AccumulatorErrorITCase {
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 
 			cluster.start();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 49e18e0..624bfff 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -234,7 +234,6 @@ public class AccumulatorLiveITCase {
 				fail("Wrong accumulator results when map task begins execution.");
 			}
 
-
 			int expectedAccVal = 0;
 
 			/* for mapper task */

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 9671fce..8a08f15 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -22,8 +22,8 @@ package org.apache.flink.test.cancelling;
 import java.util.concurrent.TimeUnit;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 
@@ -39,7 +39,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import static org.apache.flink.runtime.taskmanager.TaskCancelTest.awaitRunning;
 import static org.apache.flink.runtime.taskmanager.TaskCancelTest.cancelJob;
 import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
-import org.apache.flink.util.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 
 import org.junit.After;
@@ -65,7 +64,7 @@ public abstract class CancelingTestBase extends TestLogger {
 
 	// --------------------------------------------------------------------------------------------
 	
-	protected ForkableFlinkMiniCluster executor;
+	protected LocalFlinkMiniCluster executor;
 
 	protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
 	
@@ -88,7 +87,7 @@ public abstract class CancelingTestBase extends TestLogger {
 		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048);
 
-		this.executor = new ForkableFlinkMiniCluster(config, false);
+		this.executor = new LocalFlinkMiniCluster(config, false);
 		this.executor.start();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 163fb42..94ff66f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
@@ -35,7 +36,6 @@ import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -60,7 +60,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 
 	private static final int PARALLELISM = 4;
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 
 	@BeforeClass
@@ -71,7 +71,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
 		config.setString(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT, "60 s");
 		config.setString(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, "60 s");
-		cluster = new ForkableFlinkMiniCluster(config, false);
+		cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index fa5339d..0aee128 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -41,7 +42,6 @@ import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -76,7 +76,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 	private static final int MAX_MEM_STATE_SIZE = 10 * 1024 * 1024;
 	private static final int PARALLELISM = 4;
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 	@Rule
 	public TemporaryFolder tempFolder = new TemporaryFolder();
@@ -95,7 +95,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
 
-		cluster = new ForkableFlinkMiniCluster(config, false);
+		cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 8915bff..7f1d7f3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -43,7 +44,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
@@ -73,7 +73,7 @@ public class RescalingITCase extends TestLogger {
 	private static int slotsPerTaskManager = 2;
 	private static int numSlots = numTaskManagers * slotsPerTaskManager;
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static TestingCluster cluster;
 
 	@ClassRule
 	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -92,7 +92,7 @@ public class RescalingITCase extends TestLogger {
 		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
 		config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString());
 
-		cluster = new ForkableFlinkMiniCluster(config);
+		cluster = new TestingCluster(config);
 		cluster.start();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 550ba75..7409fe7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -43,7 +43,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointListener;
@@ -51,6 +50,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestSavepoint;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseSavepoint;
@@ -62,8 +62,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.testutils.junit.RetryOnFailure;
 import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.TestLogger;
 import org.junit.Rule;
@@ -76,7 +74,6 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -137,7 +134,7 @@ public class SavepointITCase extends TestLogger {
 
 		LOG.info("Created temporary directory: " + tmpDir + ".");
 
-		ForkableFlinkMiniCluster flink = null;
+		TestingCluster flink = null;
 
 		try {
 			// Create a test actor system
@@ -168,7 +165,7 @@ public class SavepointITCase extends TestLogger {
 			LOG.info("Flink configuration: " + config + ".");
 
 			// Start Flink
-			flink = new ForkableFlinkMiniCluster(config);
+			flink = new TestingCluster(config);
 			LOG.info("Starting Flink cluster.");
 			flink.start();
 
@@ -261,7 +258,7 @@ public class SavepointITCase extends TestLogger {
 			LOG.info("JobManager: " + jobManager + ".");
 
 			final Throwable[] error = new Throwable[1];
-			final ForkableFlinkMiniCluster finalFlink = flink;
+			final TestingCluster finalFlink = flink;
 			final Multimap<JobVertexID, TaskDeploymentDescriptor> tdds = HashMultimap.create();
 			new JavaTestKit(testActorSystem) {{
 
@@ -422,7 +419,7 @@ public class SavepointITCase extends TestLogger {
 
 		LOG.info("Created temporary directory: " + tmpDir + ".");
 
-		ForkableFlinkMiniCluster flink = null;
+		TestingCluster flink = null;
 		List<File> checkpointFiles = new ArrayList<>();
 
 		try {
@@ -447,7 +444,7 @@ public class SavepointITCase extends TestLogger {
 			LOG.info("Flink configuration: " + config + ".");
 
 			// Start Flink
-			flink = new ForkableFlinkMiniCluster(config);
+			flink = new TestingCluster(config);
 			LOG.info("Starting Flink cluster.");
 			flink.start();
 
@@ -559,7 +556,7 @@ public class SavepointITCase extends TestLogger {
 		// Test deadline
 		final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
 
-		ForkableFlinkMiniCluster flink = null;
+		TestingCluster flink = null;
 
 		try {
 			// Flink configuration
@@ -570,7 +567,7 @@ public class SavepointITCase extends TestLogger {
 			LOG.info("Flink configuration: " + config + ".");
 
 			// Start Flink
-			flink = new ForkableFlinkMiniCluster(config);
+			flink = new TestingCluster(config);
 			LOG.info("Starting Flink cluster.");
 			flink.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index cf15052..6bf511f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -34,7 +35,6 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 
 import org.apache.flink.util.TestLogger;
@@ -80,7 +80,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
 	private static final int NUM_TASK_SLOTS = 3;
 	private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 	@BeforeClass
 	public static void startCluster() {
@@ -91,7 +91,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
 			config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 ms");
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
 
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 			cluster.start();
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 67c05e5..5f6cd4a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -20,8 +20,8 @@ package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
 import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.TestLogger;
@@ -43,7 +43,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 	protected static final int NUM_TASK_SLOTS = 4;
 	protected static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 	@BeforeClass
 	public static void startCluster() {
@@ -53,7 +53,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
 			
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 
 			cluster.start();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index 2e6ce78..e424a8d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
@@ -34,7 +35,6 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -71,7 +71,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 
 	private static final int PARALLELISM = 4;
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 
 	@BeforeClass
@@ -81,7 +81,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
 
-		cluster = new ForkableFlinkMiniCluster(config, false);
+		cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 8b56d3d..7afafe4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.classloading;
 
-import akka.pattern.AskTimeoutException;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -37,9 +36,9 @@ import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
 import org.apache.flink.test.testdata.KMeansData;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -82,7 +81,7 @@ public class ClassLoaderITCase extends TestLogger {
 
 	public static final TemporaryFolder FOLDER = new TemporaryFolder();
 
-	private static ForkableFlinkMiniCluster testCluster;
+	private static TestingCluster testCluster;
 
 	private static int parallelism;
 
@@ -105,7 +104,7 @@ public class ClassLoaderITCase extends TestLogger {
 		config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY,
 				FOLDER.newFolder().getAbsoluteFile().toURI().toString());
 
-		testCluster = new ForkableFlinkMiniCluster(config, false);
+		testCluster = new TestingCluster(config, false);
 		testCluster.start();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
index c9059f1..a74ed34 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
@@ -29,8 +29,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -42,7 +42,6 @@ import java.util.concurrent.Semaphore;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
-
 /**
  * Tests retrieval of a job from a running Flink cluster
  */
@@ -54,7 +53,7 @@ public class JobRetrievalITCase extends TestLogger {
 
 	@BeforeClass
 	public static void before() {
-		cluster = new ForkableFlinkMiniCluster(new Configuration(), false);
+		cluster = new TestingCluster(new Configuration(), false);
 		cluster.start();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
index 28c2e58..178656d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
@@ -26,8 +26,8 @@ import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -47,7 +47,7 @@ public class JobSubmissionFailsITCase {
 	
 	private static final int NUM_SLOTS = 20;
 	
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 	private static JobGraph workingJobGraph;
 
 	@BeforeClass
@@ -58,7 +58,7 @@ public class JobSubmissionFailsITCase {
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS / 2);
 			
-			cluster = new ForkableFlinkMiniCluster(config);
+			cluster = new LocalFlinkMiniCluster(config);
 
 			cluster.start();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
index ca2c156..133ebd0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
@@ -29,8 +29,8 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
@@ -52,7 +52,7 @@ public class CustomDistributionITCase extends TestLogger {
 	//  The mini cluster that is shared across tests
 	// ------------------------------------------------------------------------
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 	@BeforeClass
 	public static void setup() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
index 34a7eed..e18e82a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
@@ -23,11 +23,10 @@ import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -54,7 +53,7 @@ public class RemoteEnvironmentITCase {
 
 	private static final String VALID_STARTUP_TIMEOUT = "100 s";
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 	@BeforeClass
 	public static void setupCluster() {
@@ -62,7 +61,7 @@ public class RemoteEnvironmentITCase {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 			cluster.start();
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index 09b5e7e..a67e6ef 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -49,14 +49,14 @@ public class AutoParallelismITCase {
 	private static final int SLOTS_PER_TM = 7;
 	private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM;
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 	@BeforeClass
 	public static void setupCluster() {
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TM);
-		cluster = new ForkableFlinkMiniCluster(config, false);
+		cluster = new LocalFlinkMiniCluster(config, false);
 
 		cluster.start();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
index f30f61f..51f3534 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.types.Value;
 
 import org.junit.AfterClass;
@@ -43,7 +43,7 @@ public class CustomSerializationITCase {
 
 	private static final int PARLLELISM = 5;
 	
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 	@BeforeClass
 	public static void startCluster() {
@@ -51,7 +51,7 @@ public class CustomSerializationITCase {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARLLELISM);
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 30);
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 			cluster.start();
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
index 42419fb..06b93ea 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -29,7 +29,7 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
 import org.apache.flink.util.Collector;
 
@@ -52,7 +52,7 @@ import static org.junit.Assert.*;
 @SuppressWarnings("serial")
 public class MiscellaneousIssuesITCase {
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 	
 	@BeforeClass
 	public static void startCluster() {
@@ -61,7 +61,7 @@ public class MiscellaneousIssuesITCase {
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 
 			cluster.start();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index 12b7a68..a43bab6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -32,8 +32,8 @@ import org.apache.flink.examples.java.clustering.KMeans;
 import org.apache.flink.examples.java.clustering.util.KMeansData;
 import org.apache.flink.examples.java.graph.ConnectedComponents;
 import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -43,7 +43,7 @@ public class SuccessAfterNetworkBuffersFailureITCase {
 	
 	@Test
 	public void testSuccessfulProgramAfterFailure() {
-		ForkableFlinkMiniCluster cluster = null;
+		LocalFlinkMiniCluster cluster = null;
 		
 		try {
 			Configuration config = new Configuration();
@@ -52,7 +52,7 @@ public class SuccessAfterNetworkBuffersFailureITCase {
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 840);
 			
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 
 			cluster.start();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index 40732df..b99858a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -55,6 +55,7 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseRunningTasks;
@@ -62,7 +63,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.QueryableStateStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
@@ -108,7 +108,7 @@ public class QueryableStateITCase extends TestLogger {
 	 * Shared between all the test. Make sure to have at least NUM_SLOTS
 	 * available after your test finishes, e.g. cancel the job you submitted.
 	 */
-	private static ForkableFlinkMiniCluster cluster;
+	private static TestingCluster cluster;
 
 	@BeforeClass
 	public static void setup() {
@@ -120,7 +120,7 @@ public class QueryableStateITCase extends TestLogger {
 			config.setInteger(ConfigConstants.QUERYABLE_STATE_CLIENT_NETWORK_THREADS, 1);
 			config.setInteger(ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS, 1);
 
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new TestingCluster(config, false);
 			cluster.start(true);
 		} catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
index 8a45d62..8a43ee4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
@@ -24,11 +24,11 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -49,7 +49,7 @@ public class FastFailuresITCase extends TestLogger {
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
 		
-		ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, false);
+		LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();
 		
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
index 0c5d14b..a0d6b58 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
@@ -20,7 +20,7 @@ package org.apache.flink.test.recovery;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.BeforeClass;
 
 public class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCaseBase {
@@ -34,8 +34,8 @@ public class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCas
 		config.setString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, "1 second");
 		config.setString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, "100 ms");
 
-		cluster = new ForkableFlinkMiniCluster(config, false);
+		cluster = new LocalFlinkMiniCluster(config, false);
 
 		cluster.start();
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
index 6355a8f..f09efc5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
@@ -20,7 +20,7 @@ package org.apache.flink.test.recovery;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.BeforeClass;
 
 public class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecoveryITCaseBase {
@@ -33,8 +33,8 @@ public class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecover
 		config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
 		config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "100 ms");
 
-		cluster = new ForkableFlinkMiniCluster(config, false);
+		cluster = new LocalFlinkMiniCluster(config, false);
 
 		cluster.start();
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
index 004340c..bf7c524 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.AfterClass;
 import org.junit.Test;
 
@@ -42,7 +42,7 @@ import static org.junit.Assert.*;
 @SuppressWarnings("serial")
 public abstract class SimpleRecoveryITCaseBase {
 
-	protected static ForkableFlinkMiniCluster cluster;
+	protected static LocalFlinkMiniCluster cluster;
 
 	@AfterClass
 	public static void teardownCluster() {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index 6c621ac..5d29905 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.Test;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -65,7 +65,7 @@ public class TaskManagerFailureRecoveryITCase {
 
 		final int PARALLELISM = 4;
 
-		ForkableFlinkMiniCluster cluster = null;
+		LocalFlinkMiniCluster cluster = null;
 		ActorSystem additionalSystem = null;
 
 		try {
@@ -78,7 +78,7 @@ public class TaskManagerFailureRecoveryITCase {
 			config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "20 s");
 			config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 20);
 
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 
 			cluster.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index 7710f06..0b008eb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -26,8 +26,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.Collector;
@@ -63,7 +63,7 @@ public class IPv6HostnamesITCase extends TestLogger {
 
 		
 		
-		ForkableFlinkMiniCluster flink = null;
+		LocalFlinkMiniCluster flink = null;
 		try {
 			final String addressString = ipv6address.getHostAddress();
 			log.info("Test will use IPv6 address " + addressString + " for connection tests");
@@ -75,7 +75,7 @@ public class IPv6HostnamesITCase extends TestLogger {
 			conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
 			conf.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
 			
-			flink = new ForkableFlinkMiniCluster(conf, false);
+			flink = new LocalFlinkMiniCluster(conf, false);
 			flink.start();
 
 			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(addressString, flink.getLeaderRPCPort());