You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/08 15:17:46 UTC

[1/4] flink git commit: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFlinkMiniCluster

Repository: flink
Updated Branches:
  refs/heads/master 920cda408 -> 02b852e35


http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 9bd8cc3..2738d22 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.runtime.leaderelection;
 import akka.actor.ActorSystem;
 import akka.actor.Kill;
 import akka.actor.PoisonPill;
-import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -35,14 +35,19 @@ import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
@@ -52,7 +57,6 @@ import scala.concurrent.duration.FiniteDuration;
 import scala.concurrent.impl.Promise;
 
 import java.io.File;
-import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -61,22 +65,20 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 
 	private static final FiniteDuration timeout = TestingUtils.TESTING_DURATION();
 
-	private static final File tempDirectory;
+	private static TestingServer zkServer;
 
-	static {
-		try {
-			tempDirectory = org.apache.flink.runtime.testutils
-					.CommonTestUtils.createTempDirectory();
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Test setup failed", e);
-		}
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@BeforeClass
+	public static void setup() throws Exception {
+		zkServer = new TestingServer(true);
 	}
 
 	@AfterClass
 	public static void tearDown() throws Exception {
-		if (tempDirectory != null) {
-			FileUtils.deleteDirectory(tempDirectory);
+		if (zkServer != null) {
+			zkServer.close();
 		}
 	}
 
@@ -86,18 +88,19 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 	 */
 	@Test
 	public void testTaskManagerRegistrationAtReelectedLeader() throws Exception {
-		Configuration configuration = new Configuration();
+		File rootFolder = tempFolder.getRoot();
+
+		Configuration configuration = ZooKeeperTestUtils.createZooKeeperHAConfig(
+			zkServer.getConnectString(),
+			rootFolder.getPath());
 
 		int numJMs = 10;
 		int numTMs = 3;
 
-		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-		configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
-		configuration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
 
-		ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
+		TestingCluster cluster = new TestingCluster(configuration);
 
 		try {
 			cluster.start();
@@ -137,14 +140,15 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 		int numSlotsPerTM = 3;
 		int parallelism = numTMs * numSlotsPerTM;
 
-		Configuration configuration = new Configuration();
+		File rootFolder = tempFolder.getRoot();
+
+		Configuration configuration = ZooKeeperTestUtils.createZooKeeperHAConfig(
+			zkServer.getConnectString(),
+			rootFolder.getPath());
 
-		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
 		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
-		configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
-		configuration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
 
 		// we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make
 		// sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message
@@ -169,7 +173,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 
 		final JobGraph graph = new JobGraph("Blocking test job", sender, receiver);
 
-		final ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
+		final TestingCluster cluster = new TestingCluster(configuration);
 
 		ActorSystem clientActorSystem = null;
 
@@ -250,14 +254,14 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 		boolean finished = false;
 
 		final ActorSystem clientActorSystem;
-		final ForkableFlinkMiniCluster cluster;
+		final LocalFlinkMiniCluster cluster;
 		final JobGraph graph;
 
 		final Promise<JobExecutionResult> resultPromise = new Promise.DefaultPromise<>();
 
 		public JobSubmitterRunnable(
 				ActorSystem actorSystem,
-				ForkableFlinkMiniCluster cluster,
+				LocalFlinkMiniCluster cluster,
 				JobGraph graph) {
 			this.clientActorSystem = actorSystem;
 			this.cluster = cluster;

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index d693aaa..2ed759d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -43,7 +44,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.util.TestLogger;
@@ -75,7 +75,7 @@ public class TimestampITCase extends TestLogger {
 	static MultiShotLatch latch;
 
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 	@Before
 	public void setupLatch() {
@@ -92,7 +92,7 @@ public class TimestampITCase extends TestLogger {
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
 
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 
 			cluster.start();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index fc90994..a8482ac 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -29,11 +29,11 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.files.MimeTypes;
 import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.TestBaseUtils;
 
 import org.apache.flink.util.TestLogger;
@@ -62,7 +62,7 @@ public class WebFrontendITCase extends TestLogger {
 	private static final int NUM_TASK_MANAGERS = 2;
 	private static final int NUM_SLOTS = 4;
 	
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 	private static int port = -1;
 	
@@ -86,7 +86,7 @@ public class WebFrontendITCase extends TestLogger {
 		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.getAbsolutePath());
 		config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
 
-		cluster = new ForkableFlinkMiniCluster(config, false);
+		cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();
 		
 		port = cluster.webMonitor().get().getServerPort();

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index ac661f3..1b2838d 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -21,7 +21,6 @@ package org.apache.flink.api.scala.runtime.jobmanager
 import akka.actor.{ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.api.common.{ExecutionConfig, ExecutionConfigTest}
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, NoOpInvokable}
@@ -30,8 +29,7 @@ import org.apache.flink.runtime.messages.Messages.Acknowledge
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered
 import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.{JobManagerTerminated, NotifyWhenJobManagerTerminated}
-import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
-import org.apache.flink.test.util.ForkableFlinkMiniCluster
+import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -140,12 +138,12 @@ class JobManagerFailsITCase(_system: ActorSystem)
     }
   }
 
-  def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): ForkableFlinkMiniCluster = {
+  def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = {
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
 
-    val cluster = new ForkableFlinkMiniCluster(config, singleActorSystem = false)
+    val cluster = new TestingCluster(config, singleActorSystem = false)
 
     cluster.start()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 258f6df..3b39b3f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -20,7 +20,6 @@ package org.apache.flink.api.scala.runtime.taskmanager
 
 import akka.actor.{ActorSystem, Kill, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.api.common.{ExecutionConfig, ExecutionConfigTest}
 import org.apache.flink.configuration.ConfigConstants
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
@@ -31,8 +30,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager, RegisteredAtJobManager}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
-import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
-import org.apache.flink.test.util.ForkableFlinkMiniCluster
+import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -100,7 +98,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
       val jobID = jobGraph.getJobID
 
-      val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2)
+      val cluster = TestingUtils.startTestingCluster(num_tasks, 2)
 
       val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
 
@@ -152,7 +150,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
       val jobID = jobGraph.getJobID
 
-      val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2)
+      val cluster = TestingUtils.startTestingCluster(num_tasks, 2)
 
       val taskManagers = cluster.getTaskManagers
       val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
@@ -239,11 +237,11 @@ class TaskManagerFailsITCase(_system: ActorSystem)
     }
   }
 
-  def createDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): ForkableFlinkMiniCluster = {
+  def createDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = {
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
 
-    new ForkableFlinkMiniCluster(config, singleActorSystem = false)
+    new TestingCluster(config, singleActorSystem = false)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index 8c211ef..ffdca36 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -48,6 +48,14 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
 		<!-- Needed for the streaming wordcount example -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 0243012..31a3d98 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -48,7 +48,6 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
-import org.mockito.verification.VerificationMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Marker;

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/tools/maven/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/tools/maven/scalastyle-config.xml b/tools/maven/scalastyle-config.xml
index f7bb0d4..0f7f6bb 100644
--- a/tools/maven/scalastyle-config.xml
+++ b/tools/maven/scalastyle-config.xml
@@ -86,7 +86,7 @@
  <!-- </check> -->
  <check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
   <parameters>
-   <parameter name="maxParameters"><![CDATA[10]]></parameter>
+   <parameter name="maxParameters"><![CDATA[15]]></parameter>
   </parameters>
  </check>
  <!-- <check level="error" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true"> -->


[3/4] flink git commit: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFlinkMiniCluster

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
deleted file mode 100644
index a6963fe..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.{ActorRef, Terminated}
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.FlinkActor
-import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.messages.JobManagerMessages.{RequestLeaderSessionID, ResponseLeaderSessionID}
-import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
-import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered}
-import org.apache.flink.runtime.messages.TaskMessages.{SubmitTask, TaskInFinalState, UpdateTaskExecutionState}
-import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
-import org.apache.flink.runtime.testingUtils.TestingMessages._
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-/** This mixin can be used to decorate a TaskManager with messages for testing purposes. */
-trait TestingTaskManagerLike extends FlinkActor {
-  that: TaskManager =>
-
-  import scala.collection.JavaConverters._
-
-  val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
-  val waitForJobManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
-  val waitForRegisteredAtResourceManager =
-    scala.collection.mutable.HashMap[ActorRef, Set[ActorRef]]()
-  val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
-  val unregisteredTasks = scala.collection.mutable.HashSet[ExecutionAttemptID]()
-
-  /** Map of registered task submit listeners */
-  val registeredSubmitTaskListeners = scala.collection.mutable.HashMap[JobID, ActorRef]()
-
-  val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
-
-  var disconnectDisabled = false
-
-  /**
-   * Handler for testing related messages
-   */
-  abstract override def handleMessage: Receive = {
-    handleTestingMessage orElse super.handleMessage
-  }
-
-  def handleTestingMessage: Receive = {
-    case Alive => sender() ! Acknowledge
-
-    case NotifyWhenTaskIsRunning(executionID) =>
-      Option(runningTasks.get(executionID)) match {
-        case Some(task) if task.getExecutionState == ExecutionState.RUNNING =>
-          sender ! decorateMessage(true)
-
-        case _ =>
-          val listeners = waitForRunning.getOrElse(executionID, Set())
-          waitForRunning += (executionID -> (listeners + sender))
-      }
-
-    case RequestRunningTasks =>
-      sender ! decorateMessage(ResponseRunningTasks(runningTasks.asScala.toMap))
-
-    case NotifyWhenTaskRemoved(executionID) =>
-      Option(runningTasks.get(executionID)) match {
-        case Some(_) =>
-          val set = waitForRemoval.getOrElse(executionID, Set())
-          waitForRemoval += (executionID -> (set + sender))
-        case None =>
-          if(unregisteredTasks.contains(executionID)) {
-            sender ! decorateMessage(true)
-          } else {
-            val set = waitForRemoval.getOrElse(executionID, Set())
-            waitForRemoval += (executionID -> (set + sender))
-          }
-      }
-
-    case TaskInFinalState(executionID) =>
-      super.handleMessage(TaskInFinalState(executionID))
-      waitForRemoval.remove(executionID) match {
-        case Some(actors) => for(actor <- actors) actor ! decorateMessage(true)
-        case None =>
-      }
-
-      unregisteredTasks += executionID
-
-    case RequestBroadcastVariablesWithReferences =>
-      sender ! decorateMessage(
-        ResponseBroadcastVariablesWithReferences(
-          bcVarManager.getNumberOfVariablesWithReferences)
-      )
-
-    case RequestNumActiveConnections =>
-      val numActive = if (!network.isShutdown) {
-        network.getConnectionManager.getNumberOfActiveConnections
-      } else {
-        0
-      }
-      sender ! decorateMessage(ResponseNumActiveConnections(numActive))
-
-    case NotifyWhenJobRemoved(jobID) =>
-      if(runningTasks.values.asScala.exists(_.getJobID == jobID)){
-        context.system.scheduler.scheduleOnce(
-          200 milliseconds,
-          self,
-          decorateMessage(CheckIfJobRemoved(jobID)))(
-            context.dispatcher,
-            sender()
-          )
-      }else{
-        sender ! decorateMessage(true)
-      }
-
-    case CheckIfJobRemoved(jobID) =>
-      if(runningTasks.values.asScala.forall(_.getJobID != jobID)){
-        sender ! decorateMessage(true)
-      } else {
-        context.system.scheduler.scheduleOnce(
-          200 milliseconds,
-          self,
-          decorateMessage(CheckIfJobRemoved(jobID)))(
-            context.dispatcher,
-            sender()
-          )
-      }
-
-    case NotifyWhenJobManagerTerminated(jobManager) =>
-      val waiting = waitForJobManagerToBeTerminated.getOrElse(jobManager.path.name, Set())
-      waitForJobManagerToBeTerminated += jobManager.path.name -> (waiting + sender)
-
-    case RegisterSubmitTaskListener(jobId) =>
-      registeredSubmitTaskListeners.put(jobId, sender())
-
-    case msg@SubmitTask(tdd) =>
-      registeredSubmitTaskListeners.get(tdd.getJobID) match {
-        case Some(listenerRef) =>
-          listenerRef ! ResponseSubmitTaskListener(tdd)
-        case None =>
-        // Nothing to do
-      }
-
-      super.handleMessage(msg)
-
-    /**
-     * Message from task manager that accumulator values changed and need to be reported immediately
-     * instead of lazily through the
-     * [[org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat]] message. We forward this
-     * message to the job manager that it knows it should report to the listeners.
-     */
-    case msg: AccumulatorsChanged =>
-      currentJobManager match {
-        case Some(jobManager) =>
-          jobManager.forward(msg)
-          sendHeartbeatToJobManager()
-          sender ! true
-        case None =>
-      }
-
-    case msg@Terminated(jobManager) =>
-      super.handleMessage(msg)
-
-      waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
-        _ foreach {
-          _ ! decorateMessage(JobManagerTerminated(jobManager))
-        }
-      }
-
-    case msg:Disconnect =>
-      if (!disconnectDisabled) {
-        super.handleMessage(msg)
-
-        val jobManager = sender()
-
-        waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
-          _ foreach {
-            _ ! decorateMessage(JobManagerTerminated(jobManager))
-          }
-        }
-      }
-
-    case DisableDisconnect =>
-      disconnectDisabled = true
-
-    case NotifyOfComponentShutdown =>
-      waitForShutdown += sender()
-
-    case msg @ UpdateTaskExecutionState(taskExecutionState) =>
-      super.handleMessage(msg)
-
-      if(taskExecutionState.getExecutionState == ExecutionState.RUNNING) {
-        waitForRunning.get(taskExecutionState.getID) foreach {
-          _ foreach (_ ! decorateMessage(true))
-        }
-      }
-
-    case RequestLeaderSessionID =>
-      sender() ! ResponseLeaderSessionID(leaderSessionID.orNull)
-
-    case NotifyWhenRegisteredAtJobManager(jobManager: ActorRef) =>
-      if(isConnected && jobManager == currentJobManager.get) {
-        sender() ! true
-      } else {
-        val list = waitForRegisteredAtResourceManager.getOrElse(
-          jobManager,
-          Set[ActorRef]())
-
-        waitForRegisteredAtResourceManager += jobManager -> (list + sender())
-      }
-
-    case msg @ (_: AcknowledgeRegistration | _: AlreadyRegistered) =>
-      super.handleMessage(msg)
-
-      val jm = sender()
-
-      waitForRegisteredAtResourceManager.remove(jm).foreach {
-        listeners => listeners.foreach{
-          listener =>
-            listener ! true
-        }
-      }
-  }
-
-  /**
-    * No killing of the VM for testing.
-    */
-  override protected def shutdown(): Unit = {
-    log.info("Shutting down TestingJobManager.")
-    waitForShutdown.foreach(_ ! ComponentShutdown(self))
-    waitForShutdown.clear()
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
deleted file mode 100644
index 974e4e8..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.ActorRef
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.taskmanager.Task
-
-/**
- * Additional messages that the [[TestingTaskManager]] understands.
- */
-object TestingTaskManagerMessages {
-  
-  case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID)
-
-  case class NotifyWhenTaskIsRunning(executionID: ExecutionAttemptID)
-  
-  case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){
-    import collection.JavaConverters._
-    def asJava: java.util.Map[ExecutionAttemptID, Task] = tasks.asJava
-  }
-  
-  case class ResponseBroadcastVariablesWithReferences(number: Int)
-
-  case object RequestNumActiveConnections
-  case class ResponseNumActiveConnections(number: Int)
-  
-  case object RequestRunningTasks
-  
-  case object RequestBroadcastVariablesWithReferences
-
-  case class NotifyWhenJobManagerTerminated(jobManager: ActorRef)
-
-  case class JobManagerTerminated(jobManager: ActorRef)
-
-  case class NotifyWhenRegisteredAtJobManager(resourceManager: ActorRef)
-
-  /**
-   * Message to give a hint to the task manager that accumulator values were updated in the task.
-   * This message is forwarded to the job manager which knows that it needs to notify listeners
-   * of accumulator updates.
-   */
-  case class AccumulatorsChanged(jobID: JobID)
-
-  /**
-    * Registers a listener for all [[org.apache.flink.runtime.messages.TaskMessages.SubmitTask]]
-    * messages of the given job.
-    *
-    * If a task is submitted with the given job ID the task deployment
-    * descriptor is forwarded to the listener.
-    *
-    * @param jobId The job ID to listen for.
-    */
-  case class RegisterSubmitTaskListener(jobId: JobID)
-
-  /**
-    * A response to a listened job ID containing the submitted task deployment descriptor.
-    *
-    * @param tdd The submitted task deployment descriptor.
-    */
-  case class ResponseSubmitTaskListener(tdd: TaskDeploymentDescriptor)
-
-  // --------------------------------------------------------------------------
-  // Utility methods to allow simpler case object access from Java
-  // --------------------------------------------------------------------------
-  
-  def getRequestRunningTasksMessage: AnyRef = {
-    RequestRunningTasks
-  }
-  
-  def getRequestBroadcastVariablesWithReferencesMessage: AnyRef = {
-    RequestBroadcastVariablesWithReferences
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
index e596166..c143fe2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import scala.Option;
@@ -86,7 +85,7 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
 
 	@Override
 	public int getNumberOfJobManagers() {
-		return this.configuration().getInteger(
+		return this.originalConfiguration().getInteger(
 				ConfigConstants.LOCAL_NUMBER_JOB_MANAGER,
 				ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
new file mode 100644
index 0000000..495cacd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import akka.actor.ActorRef;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * A testing resource manager which may alter the default standalone resource master's behavior.
+ */
+public class TestingResourceManager extends StandaloneResourceManager {
+
+	/** Set of Actors which want to be informed of a connection to the job manager */
+	private Set<ActorRef> waitForResourceManagerConnected = new HashSet<>();
+
+	/** Set of Actors which want to be informed of a shutdown */
+	private Set<ActorRef> waitForShutdown = new HashSet<>();
+
+	/** Flag to signal a connection to the JobManager */
+	private boolean isConnected = false;
+
+	public TestingResourceManager(Configuration flinkConfig, LeaderRetrievalService leaderRetriever) {
+		super(flinkConfig, leaderRetriever);
+	}
+
+	/**
+	 * Overwrite messages here if desired
+	 */
+	@Override
+	protected void handleMessage(Object message) {
+
+		if (message instanceof GetRegisteredResources) {
+			sender().tell(new GetRegisteredResourcesReply(getStartedTaskManagers()), self());
+		} else if (message instanceof FailResource) {
+			ResourceID resourceID = ((FailResource) message).resourceID;
+			notifyWorkerFailed(resourceID, "Failed for test case.");
+
+		} else if (message instanceof NotifyWhenResourceManagerConnected) {
+			if (isConnected) {
+				sender().tell(
+					Messages.getAcknowledge(),
+					self());
+			} else {
+				waitForResourceManagerConnected.add(sender());
+			}
+		} else if (message instanceof RegisterResourceManagerSuccessful) {
+			super.handleMessage(message);
+
+			isConnected = true;
+
+			for (ActorRef ref : waitForResourceManagerConnected) {
+				ref.tell(
+					Messages.getAcknowledge(),
+					self());
+			}
+			waitForResourceManagerConnected.clear();
+
+		} else if (message instanceof TestingMessages.NotifyOfComponentShutdown$) {
+			waitForShutdown.add(sender());
+		} else if (message instanceof TestingMessages.Alive$) {
+			sender().tell(Messages.getAcknowledge(), self());
+		} else {
+			super.handleMessage(message);
+		}
+	}
+
+	/**
+	 * Testing messages
+	 */
+	public static class GetRegisteredResources {}
+
+	public static class GetRegisteredResourcesReply {
+
+		public Collection<ResourceID> resources;
+
+		public GetRegisteredResourcesReply(Collection<ResourceID> resources) {
+			this.resources = resources;
+		}
+
+	}
+
+	/**
+	 * Fails all resources that the resource manager has registered
+	 */
+	public static class FailResource {
+
+		public ResourceID resourceID;
+
+		public FailResource(ResourceID resourceID) {
+			this.resourceID = resourceID;
+		}
+	}
+
+	/**
+	 * The sender of this message will be informed of a connection to the Job Manager
+	 */
+	public static class NotifyWhenResourceManagerConnected {}
+
+	/**
+	 * Inform registered listeners about a shutdown of the application.
+     */
+	@Override
+	protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+		for (ActorRef listener : waitForShutdown) {
+			listener.tell(new TestingMessages.ComponentShutdown(self()), self());
+		}
+		waitForShutdown.clear();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index b4ba40b..c01a321 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -18,22 +18,32 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import java.util.concurrent.TimeoutException
+import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
 
 import akka.pattern.ask
-import akka.actor.{ActorRef, Props, ActorSystem}
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.pattern.Patterns._
 import akka.testkit.CallingThreadDispatcher
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
-import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
+import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, SubmittedJobGraphStore}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster
+import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.testutils.TestingResourceManager
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
 
+import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{Await, Future}
 
 /**
@@ -48,7 +58,7 @@ class TestingCluster(
     userConfiguration: Configuration,
     singleActorSystem: Boolean,
     synchronousDispatcher: Boolean)
-  extends FlinkMiniCluster(
+  extends LocalFlinkMiniCluster(
     userConfiguration,
     singleActorSystem) {
 
@@ -59,133 +69,54 @@ class TestingCluster(
 
   // --------------------------------------------------------------------------
 
-  override def generateConfiguration(userConfig: Configuration): Configuration = {
-    val cfg = new Configuration()
-    cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")
-    cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
-    cfg.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, 0)
-    cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10)
-    cfg.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1)
-
-    setDefaultCiConfig(cfg)
-
-    cfg.addAll(userConfig)
-    cfg
-  }
-
-  override def startJobManager(index: Int, actorSystem: ActorSystem): ActorRef = {
-    val config = configuration.clone()
-
-    val jobManagerName = if(singleActorSystem) {
-      JobManager.JOB_MANAGER_NAME + "_" + (index + 1)
-    } else {
-      JobManager.JOB_MANAGER_NAME
-    }
-
-    val archiveName = if(singleActorSystem) {
-      JobManager.ARCHIVE_NAME + "_" + (index + 1)
-    } else {
-      JobManager.ARCHIVE_NAME
-    }
-
-    val jobManagerPort = config.getInteger(
-      ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
-
-    if(jobManagerPort > 0) {
-      config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
-    }
-
-    val (executionContext,
-    instanceManager,
-    scheduler,
-    libraryCacheManager,
-    restartStrategyFactory,
-    timeout,
-    archiveCount,
-    leaderElectionService,
-    submittedJobsGraphs,
-    checkpointRecoveryFactory,
-    savepointStore,
-    jobRecoveryTimeout,
-    metricRegistry) = JobManager.createJobManagerComponents(
-      config,
-      createLeaderElectionService())
-
-    val testArchiveProps = Props(new TestingMemoryArchivist(archiveCount))
-    val archive = actorSystem.actorOf(testArchiveProps, archiveName)
-
-    val jobManagerProps = Props(
-      new TestingJobManager(
-        configuration,
-        executionContext,
-        instanceManager,
-        scheduler,
-        libraryCacheManager,
-        archive,
-        restartStrategyFactory,
-        timeout,
-        leaderElectionService,
-        submittedJobsGraphs,
-        checkpointRecoveryFactory,
-        savepointStore,
-        jobRecoveryTimeout,
-        metricRegistry))
-
-    val dispatcherJobManagerProps = if (synchronousDispatcher) {
-      // disable asynchronous futures (e.g. accumulator update in Heartbeat)
-      jobManagerProps.withDispatcher(CallingThreadDispatcher.Id)
-    } else {
-      jobManagerProps
-    }
-
-    actorSystem.actorOf(dispatcherJobManagerProps, jobManagerName)
-  }
-
-  override def startResourceManager(index: Int, system: ActorSystem): ActorRef = {
-    val config = configuration.clone()
-
-    val resourceManagerName = if(singleActorSystem) {
-      FlinkResourceManager.RESOURCE_MANAGER_NAME + "_" + (index + 1)
+  override val jobManagerClass: Class[_ <: JobManager] = classOf[TestingJobManager]
+
+  override val resourceManagerClass: Class[_ <: FlinkResourceManager[_ <: ResourceIDRetrievable]] =
+    classOf[TestingResourceManager]
+
+  override val taskManagerClass: Class[_ <: TaskManager] = classOf[TestingTaskManager]
+
+  override val memoryArchivistClass: Class[_ <: MemoryArchivist] = classOf[TestingMemoryArchivist]
+
+  override def getJobManagerProps(
+    jobManagerClass: Class[_ <: JobManager],
+    configuration: Configuration,
+    executorService: ExecutorService,
+    instanceManager: InstanceManager,
+    scheduler: Scheduler,
+    libraryCacheManager: BlobLibraryCacheManager,
+    archive: ActorRef,
+    restartStrategyFactory: RestartStrategyFactory,
+    timeout: FiniteDuration,
+    leaderElectionService: LeaderElectionService,
+    submittedJobGraphStore: SubmittedJobGraphStore,
+    checkpointRecoveryFactory: CheckpointRecoveryFactory,
+    savepointStore: SavepointStore,
+    jobRecoveryTimeout: FiniteDuration,
+    metricsRegistry: Option[MetricRegistry]): Props = {
+
+    val props = super.getJobManagerProps(
+      jobManagerClass,
+      configuration,
+      executorService,
+      instanceManager,
+      scheduler,
+      libraryCacheManager,
+      archive,
+      restartStrategyFactory,
+      timeout,
+      leaderElectionService,
+      submittedJobGraphStore,
+      checkpointRecoveryFactory,
+      savepointStore,
+      jobRecoveryTimeout,
+      metricsRegistry)
+
+    if (synchronousDispatcher) {
+      props.withDispatcher(CallingThreadDispatcher.Id)
     } else {
-      FlinkResourceManager.RESOURCE_MANAGER_NAME
+      props
     }
-
-    val resourceManagerPort = config.getInteger(
-      ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
-
-    if(resourceManagerPort > 0) {
-      config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index)
-    }
-
-    val testResourceManagerProps = Props(
-      new TestingResourceManager(
-        config,
-        createLeaderRetrievalService()
-      ))
-
-    system.actorOf(testResourceManagerProps, resourceManagerName)
-  }
-
-  override def startTaskManager(index: Int, system: ActorSystem) = {
-
-    val tmActorName = TaskManager.TASK_MANAGER_NAME + "_" + (index + 1)
-
-    TaskManager.startTaskManagerComponentsAndActor(
-      configuration,
-      ResourceID.generate(),
-      system,
-      hostname,
-      Some(tmActorName),
-      Some(createLeaderRetrievalService()),
-      numTaskManagers == 1,
-      classOf[TestingTaskManager])
-  }
-
-
-  def createLeaderElectionService(): Option[LeaderElectionService] = {
-    None
   }
 
   @throws(classOf[TimeoutException])
@@ -228,4 +159,131 @@ class TestingCluster(
 
     Await.ready(combinedFuture, timeout)
   }
+
+  def waitForTaskManagersToBeRegisteredAtJobManager(jobManager: ActorRef): Unit = {
+    val futures = taskManagerActors.map {
+      _.map {
+              tm => (tm ? NotifyWhenRegisteredAtJobManager(jobManager))(timeout)
+            }
+    }.getOrElse(Seq())
+
+    try {
+      Await.ready(Future.sequence(futures), timeout)
+    } catch {
+      case t: TimeoutException =>
+        throw new Exception("Timeout while waiting for TaskManagers to register at " +
+                              s"${jobManager.path}")
+    }
+
+  }
+
+  def restartLeadingJobManager(): Unit = {
+    this.synchronized {
+      (jobManagerActorSystems, jobManagerActors) match {
+        case (Some(jmActorSystems), Some(jmActors)) =>
+          val leader = getLeaderGateway(AkkaUtils.getTimeout(originalConfiguration))
+          val index = getLeaderIndex(AkkaUtils.getTimeout(originalConfiguration))
+
+          // restart the leading job manager with the same port
+          val port = getLeaderRPCPort
+          val oldPort = originalConfiguration.getInteger(
+            ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+            0)
+
+          // we have to set the old port in the configuration file because this is used for startup
+          originalConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port)
+
+          clearLeader()
+
+          val stopped = gracefulStop(leader.actor(), TestingCluster.MAX_RESTART_DURATION)
+          Await.result(stopped, TestingCluster.MAX_RESTART_DURATION)
+
+          if(!singleActorSystem) {
+            jmActorSystems(index).shutdown()
+            jmActorSystems(index).awaitTermination()
+          }
+
+          val newJobManagerActorSystem = if(!singleActorSystem) {
+            startJobManagerActorSystem(index)
+          } else {
+            jmActorSystems.head
+          }
+
+          // reset the original configuration
+          originalConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, oldPort)
+
+          val newJobManagerActor = startJobManager(index, newJobManagerActorSystem)
+
+          jobManagerActors = Some(jmActors.patch(index, Seq(newJobManagerActor), 1))
+          jobManagerActorSystems = Some(jmActorSystems.patch(
+            index,
+            Seq(newJobManagerActorSystem),
+            1))
+
+          val lrs = createLeaderRetrievalService()
+
+          jobManagerLeaderRetrievalService = Some(lrs)
+          lrs.start(this)
+
+        case _ => throw new Exception("The JobManager of the TestingCluster have not " +
+                                        "been started properly.")
+      }
+    }
+  }
+
+  def restartTaskManager(index: Int): Unit = {
+    (taskManagerActorSystems, taskManagerActors) match {
+      case (Some(tmActorSystems), Some(tmActors)) =>
+        val stopped = gracefulStop(tmActors(index), TestingCluster.MAX_RESTART_DURATION)
+        Await.result(stopped, TestingCluster.MAX_RESTART_DURATION)
+
+        if(!singleActorSystem) {
+          tmActorSystems(index).shutdown()
+          tmActorSystems(index).awaitTermination()
+        }
+
+        val taskManagerActorSystem  = if(!singleActorSystem) {
+          startTaskManagerActorSystem(index)
+        } else {
+          tmActorSystems.head
+        }
+
+        val taskManagerActor = startTaskManager(index, taskManagerActorSystem)
+
+        taskManagerActors = Some(tmActors.patch(index, Seq(taskManagerActor), 1))
+        taskManagerActorSystems = Some(tmActorSystems.patch(index, Seq(taskManagerActorSystem), 1))
+
+      case _ => throw new Exception("The TaskManager of the TestingCluster have not " +
+                                      "been started properly.")
+    }
+  }
+
+  def addTaskManager(): Unit = {
+    if (useSingleActorSystem) {
+      (jobManagerActorSystems, taskManagerActors) match {
+        case (Some(jmSystems), Some(tmActors)) =>
+          val index = numTaskManagers
+          taskManagerActors = Some(tmActors :+ startTaskManager(index, jmSystems(0)))
+          numTaskManagers += 1
+        case _ => throw new IllegalStateException("Cluster has not been started properly.")
+      }
+    } else {
+      (taskManagerActorSystems, taskManagerActors) match {
+        case (Some(tmSystems), Some(tmActors)) =>
+          val index = numTaskManagers
+          val newTmSystem = startTaskManagerActorSystem(index)
+          val newTmActor = startTaskManager(index, newTmSystem)
+
+          taskManagerActorSystems = Some(tmSystems :+ newTmSystem)
+          taskManagerActors = Some(tmActors :+ newTmActor)
+
+          numTaskManagers += 1
+        case _ => throw new IllegalStateException("Cluster has not been started properly.")
+      }
+    }
+  }
+}
+
+object TestingCluster {
+  val MAX_RESTART_DURATION = new FiniteDuration(2, TimeUnit.MINUTES)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
new file mode 100644
index 0000000..62349db
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import java.util.concurrent.ExecutorService
+
+import akka.actor.ActorRef
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
+import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.metrics.MetricRegistry
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+/** JobManager implementation extended by testing messages
+  *
+  */
+class TestingJobManager(
+    flinkConfiguration: Configuration,
+    executorService: ExecutorService,
+    instanceManager: InstanceManager,
+    scheduler: Scheduler,
+    libraryCacheManager: BlobLibraryCacheManager,
+    archive: ActorRef,
+    restartStrategyFactory: RestartStrategyFactory,
+    timeout: FiniteDuration,
+    leaderElectionService: LeaderElectionService,
+    submittedJobGraphs : SubmittedJobGraphStore,
+    checkpointRecoveryFactory : CheckpointRecoveryFactory,
+    savepointStore : SavepointStore,
+    jobRecoveryTimeout : FiniteDuration,
+    metricRegistry : Option[MetricRegistry])
+  extends JobManager(
+    flinkConfiguration,
+      executorService,
+    instanceManager,
+    scheduler,
+    libraryCacheManager,
+    archive,
+    restartStrategyFactory,
+    timeout,
+    leaderElectionService,
+    submittedJobGraphs,
+    checkpointRecoveryFactory,
+    savepointStore,
+    jobRecoveryTimeout,
+    metricRegistry)
+  with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
new file mode 100644
index 0000000..5ba2790
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.{ActorRef, Cancellable, Terminated}
+import akka.pattern.{ask, pipe}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
+import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient, RequestClassloadingProps}
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
+import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
+import org.apache.flink.runtime.testingUtils.TestingMessages._
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+/** This mixin can be used to decorate a JobManager with messages for testing purpose.  */
+trait TestingJobManagerLike extends FlinkActor {
+  that: JobManager =>
+
+  import context._
+
+  import scala.collection.JavaConverters._
+
+  val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
+  val waitForTaskManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
+
+  val waitForAllVerticesToBeRunningOrFinished =
+    scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
+
+  var periodicCheck: Option[Cancellable] = None
+
+  val waitForJobStatus = scala.collection.mutable.HashMap[JobID,
+    collection.mutable.HashMap[JobStatus, Set[ActorRef]]]()
+
+  val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, (Boolean, Set[ActorRef])]()
+
+  val waitForLeader = scala.collection.mutable.HashSet[ActorRef]()
+
+  val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder(
+    new Ordering[(Int, ActorRef)] {
+      override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1
+    })
+
+  val waitForClient = scala.collection.mutable.HashSet[ActorRef]()
+
+  val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
+
+  var disconnectDisabled = false
+
+  var postStopEnabled = true
+
+  abstract override def postStop(): Unit = {
+    if (postStopEnabled) {
+      super.postStop()
+    } else {
+      // only stop leader election service to revoke the leadership of this JM so that a new JM
+      // can be elected leader
+      leaderElectionService.stop()
+    }
+  }
+
+  abstract override def handleMessage: Receive = {
+    handleTestingMessage orElse super.handleMessage
+  }
+
+  def handleTestingMessage: Receive = {
+    case Alive => sender() ! Acknowledge
+
+    case RequestExecutionGraph(jobID) =>
+      currentJobs.get(jobID) match {
+        case Some((executionGraph, jobInfo)) => sender() ! decorateMessage(
+          ExecutionGraphFound(
+            jobID,
+            executionGraph)
+        )
+
+        case None => archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender())
+      }
+
+    case WaitForAllVerticesToBeRunning(jobID) =>
+      if(checkIfAllVerticesRunning(jobID)){
+        sender() ! decorateMessage(AllVerticesRunning(jobID))
+      }else{
+        val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]())
+        waitForAllVerticesToBeRunning += jobID -> (waiting + sender())
+
+        if(periodicCheck.isEmpty){
+          periodicCheck =
+            Some(
+              context.system.scheduler.schedule(
+                0 seconds,
+                200 millis,
+                self,
+                decorateMessage(NotifyListeners)
+              )
+            )
+        }
+      }
+    case WaitForAllVerticesToBeRunningOrFinished(jobID) =>
+      if(checkIfAllVerticesRunningOrFinished(jobID)){
+        sender() ! decorateMessage(AllVerticesRunning(jobID))
+      }else{
+        val waiting = waitForAllVerticesToBeRunningOrFinished.getOrElse(jobID, Set[ActorRef]())
+        waitForAllVerticesToBeRunningOrFinished += jobID -> (waiting + sender())
+
+        if(periodicCheck.isEmpty){
+          periodicCheck =
+            Some(
+              context.system.scheduler.schedule(
+                0 seconds,
+                200 millis,
+                self,
+                decorateMessage(NotifyListeners)
+              )
+            )
+        }
+      }
+
+    case NotifyListeners =>
+      for(jobID <- currentJobs.keySet){
+        notifyListeners(jobID)
+      }
+
+      if(waitForAllVerticesToBeRunning.isEmpty && waitForAllVerticesToBeRunningOrFinished.isEmpty) {
+        periodicCheck foreach { _.cancel() }
+        periodicCheck = None
+      }
+
+
+    case NotifyWhenJobRemoved(jobID) =>
+      val gateways = instanceManager.getAllRegisteredInstances.asScala.map(_.getActorGateway)
+
+      val responses = gateways.map{
+        gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean]
+      }
+
+      val jobRemovedOnJobManager = (self ? CheckIfJobRemoved(jobID))(timeout).mapTo[Boolean]
+
+      val allFutures = responses ++ Seq(jobRemovedOnJobManager)
+
+      import context.dispatcher
+      Future.fold(allFutures)(true)(_ & _) map(decorateMessage(_)) pipeTo sender()
+
+    case CheckIfJobRemoved(jobID) =>
+      if(currentJobs.contains(jobID)) {
+        context.system.scheduler.scheduleOnce(
+          200 milliseconds,
+          self,
+          decorateMessage(CheckIfJobRemoved(jobID))
+        )(context.dispatcher, sender())
+      } else {
+        sender() ! decorateMessage(true)
+      }
+
+    case NotifyWhenTaskManagerTerminated(taskManager) =>
+      val waiting = waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set())
+      waitForTaskManagerToBeTerminated += taskManager.path.name -> (waiting + sender)
+
+    case msg@Terminated(taskManager) =>
+      super.handleMessage(msg)
+
+      waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
+        _ foreach {
+          listener =>
+            listener ! decorateMessage(TaskManagerTerminated(taskManager))
+        }
+      }
+
+    // see shutdown method for reply
+    case NotifyOfComponentShutdown =>
+      waitForShutdown += sender()
+
+    case NotifyWhenAccumulatorChange(jobID) =>
+
+      val (updated, registered) = waitForAccumulatorUpdate.
+        getOrElse(jobID, (false, Set[ActorRef]()))
+      waitForAccumulatorUpdate += jobID -> (updated, registered + sender)
+      sender ! true
+
+    /**
+     * Notification from the task manager that changed accumulator are transferred on next
+     * Hearbeat. We need to keep this state to notify the listeners on next Heartbeat report.
+     */
+    case AccumulatorsChanged(jobID: JobID) =>
+      waitForAccumulatorUpdate.get(jobID) match {
+        case Some((updated, registered)) =>
+          waitForAccumulatorUpdate.put(jobID, (true, registered))
+        case None =>
+      }
+
+    /**
+     * Disabled async processing of accumulator values and send accumulators to the listeners if
+     * we previously received an [[AccumulatorsChanged]] message.
+     */
+    case msg : Heartbeat =>
+      super.handleMessage(msg)
+
+      waitForAccumulatorUpdate foreach {
+        case (jobID, (updated, actors)) if updated =>
+          currentJobs.get(jobID) match {
+            case Some((graph, jobInfo)) =>
+              val flinkAccumulators = graph.getFlinkAccumulators
+              val userAccumulators = graph.aggregateUserAccumulators
+              actors foreach {
+                 actor => actor ! UpdatedAccumulators(jobID, flinkAccumulators, userAccumulators)
+              }
+            case None =>
+          }
+          waitForAccumulatorUpdate.put(jobID, (false, actors))
+        case _ =>
+      }
+
+    case RequestWorkingTaskManager(jobID) =>
+      currentJobs.get(jobID) match {
+        case Some((eg, _)) =>
+          if(eg.getAllExecutionVertices.asScala.isEmpty){
+            sender ! decorateMessage(WorkingTaskManager(None))
+          } else {
+            val resource = eg.getAllExecutionVertices.asScala.head.getCurrentAssignedResource
+
+            if(resource == null){
+              sender ! decorateMessage(WorkingTaskManager(None))
+            } else {
+              sender ! decorateMessage(
+                WorkingTaskManager(
+                  Some(resource.getTaskManagerActorGateway())
+                )
+              )
+            }
+          }
+        case None => sender ! decorateMessage(WorkingTaskManager(None))
+      }
+
+    case NotifyWhenJobStatus(jobID, state) =>
+      val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID,
+        scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]())
+
+      val listener = jobStatusListener.getOrElse(state, Set[ActorRef]())
+
+      jobStatusListener += state -> (listener + sender)
+
+    case msg@JobStatusChanged(jobID, newJobStatus, _, _) =>
+      super.handleMessage(msg)
+
+      val cleanup = waitForJobStatus.get(jobID) match {
+        case Some(stateListener) =>
+          stateListener.remove(newJobStatus) match {
+            case Some(listeners) =>
+              listeners foreach {
+                _ ! decorateMessage(JobStatusIs(jobID, newJobStatus))
+              }
+            case _ =>
+          }
+          stateListener.isEmpty
+
+        case _ => false
+      }
+
+      if (cleanup) {
+        waitForJobStatus.remove(jobID)
+      }
+
+    case DisableDisconnect =>
+      disconnectDisabled = true
+
+    case DisablePostStop =>
+      postStopEnabled = false
+
+    case RequestSavepoint(savepointPath) =>
+      try {
+        val savepoint = savepointStore.loadSavepoint(savepointPath)
+        sender ! ResponseSavepoint(savepoint)
+      }
+      catch {
+        case e: Exception =>
+          sender ! ResponseSavepoint(null)
+      }
+
+    case msg: Disconnect =>
+      if (!disconnectDisabled) {
+        super.handleMessage(msg)
+
+        val taskManager = sender()
+
+        waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
+          _ foreach {
+            listener =>
+              listener ! decorateMessage(TaskManagerTerminated(taskManager))
+          }
+        }
+      }
+
+    case NotifyWhenLeader =>
+      if (leaderElectionService.hasLeadership) {
+        sender() ! true
+      } else {
+        waitForLeader += sender()
+      }
+
+    case msg: GrantLeadership =>
+      super.handleMessage(msg)
+
+      waitForLeader.foreach(_ ! true)
+
+      waitForLeader.clear()
+
+    case NotifyWhenClientConnects =>
+      waitForClient += sender()
+      sender() ! true
+
+    case msg: RegisterJobClient =>
+      super.handleMessage(msg)
+      waitForClient.foreach(_ ! ClientConnected)
+    case msg: RequestClassloadingProps =>
+      super.handleMessage(msg)
+      waitForClient.foreach(_ ! ClassLoadingPropsDelivered)
+
+    case NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) =>
+      if (that.instanceManager.getNumberOfRegisteredTaskManagers >= numRegisteredTaskManager) {
+        // there are already at least numRegisteredTaskManager registered --> send Acknowledge
+        sender() ! Acknowledge
+      } else {
+        // wait until we see at least numRegisteredTaskManager being registered at the JobManager
+        waitForNumRegisteredTaskManagers += ((numRegisteredTaskManager, sender()))
+      }
+
+    // TaskManager may be registered on these two messages
+    case msg @ (_: RegisterTaskManager) =>
+      super.handleMessage(msg)
+
+      // dequeue all senders which wait for instanceManager.getNumberOfStartedTaskManagers or
+      // fewer registered TaskManagers
+      while (waitForNumRegisteredTaskManagers.nonEmpty &&
+        waitForNumRegisteredTaskManagers.head._1 <=
+          instanceManager.getNumberOfRegisteredTaskManagers) {
+        val receiver = waitForNumRegisteredTaskManagers.dequeue()._2
+        receiver ! Acknowledge
+      }
+  }
+
+  def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
+    currentJobs.get(jobID) match {
+      case Some((eg, _)) =>
+        eg.getAllExecutionVertices.asScala.forall( _.getExecutionState == ExecutionState.RUNNING)
+      case None => false
+    }
+  }
+
+  def checkIfAllVerticesRunningOrFinished(jobID: JobID): Boolean = {
+    currentJobs.get(jobID) match {
+      case Some((eg, _)) =>
+        eg.getAllExecutionVertices.asScala.forall {
+          case vertex =>
+            (vertex.getExecutionState == ExecutionState.RUNNING
+              || vertex.getExecutionState == ExecutionState.FINISHED)
+        }
+      case None => false
+    }
+  }
+
+  def notifyListeners(jobID: JobID): Unit = {
+    if(checkIfAllVerticesRunning(jobID)) {
+      waitForAllVerticesToBeRunning.remove(jobID) match {
+        case Some(listeners) =>
+          for (listener <- listeners) {
+            listener ! decorateMessage(AllVerticesRunning(jobID))
+          }
+        case _ =>
+      }
+    }
+
+    if(checkIfAllVerticesRunningOrFinished(jobID)) {
+      waitForAllVerticesToBeRunningOrFinished.remove(jobID) match {
+        case Some(listeners) =>
+          for (listener <- listeners) {
+            listener ! decorateMessage(AllVerticesRunning(jobID))
+          }
+        case _ =>
+      }
+    }
+  }
+
+  /**
+    * No killing of the VM for testing.
+    */
+  override protected def shutdown(): Unit = {
+    log.info("Shutting down TestingJobManager.")
+    waitForShutdown.foreach(_ ! ComponentShutdown(self))
+    waitForShutdown.clear()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
new file mode 100644
index 0000000..d07c48f
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import java.util.Map
+
+import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
+import org.apache.flink.api.common.accumulators.Accumulator
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
+import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
+import org.apache.flink.runtime.instance.ActorGateway
+import org.apache.flink.runtime.jobgraph.JobStatus
+
+object TestingJobManagerMessages {
+
+  case class RequestExecutionGraph(jobID: JobID)
+
+  sealed trait ResponseExecutionGraph {
+    def jobID: JobID
+  }
+
+  case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph) extends
+  ResponseExecutionGraph
+
+  case class ExecutionGraphNotFound(jobID: JobID) extends ResponseExecutionGraph
+
+  case class WaitForAllVerticesToBeRunning(jobID: JobID)
+  case class WaitForAllVerticesToBeRunningOrFinished(jobID: JobID)
+  case class AllVerticesRunning(jobID: JobID)
+
+  case class NotifyWhenJobRemoved(jobID: JobID)
+
+  case class RequestWorkingTaskManager(jobID: JobID)
+  case class WorkingTaskManager(gatewayOption: Option[ActorGateway])
+
+  case class NotifyWhenJobStatus(jobID: JobID, state: JobStatus)
+  case class JobStatusIs(jobID: JobID, state: JobStatus)
+
+  case object NotifyListeners
+
+  case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
+  case class TaskManagerTerminated(taskManager: ActorRef)
+
+  /**
+   * Registers a listener to receive a message when accumulators changed.
+   * The change must be explicitly triggered by the TestingTaskManager which can receive an
+   * [[org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged]]
+   * message by a task that changed the accumulators. This message is then
+   * forwarded to the JobManager which will send the accumulators in the [[UpdatedAccumulators]]
+   * message when the next Heartbeat occurs.
+   */
+  case class NotifyWhenAccumulatorChange(jobID: JobID)
+
+  /**
+   * Reports updated accumulators back to the listener.
+   */
+  case class UpdatedAccumulators(jobID: JobID,
+    flinkAccumulators: Map[ExecutionAttemptID, Map[AccumulatorRegistry.Metric, Accumulator[_,_]]],
+    userAccumulators: Map[String, Accumulator[_,_]])
+
+  /** Notifies the sender when the [[TestingJobManager]] has been elected as the leader
+   *
+   */
+  case object NotifyWhenLeader
+
+  /**
+    * Notifies the sender when the [[TestingJobManager]] receives new clients for jobs
+    */
+  case object NotifyWhenClientConnects
+  /**
+    * Notifes of client connect
+    */
+  case object ClientConnected
+  /**
+    * Notifies when the client has requested class loading information
+    */
+  case object ClassLoadingPropsDelivered
+
+  /**
+   * Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]]
+   * message when at least numRegisteredTaskManager have registered at the JobManager.
+   *
+   * @param numRegisteredTaskManager minimum number of registered TMs before the sender is notified
+   */
+  case class NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager: Int)
+
+  /** Disables the post stop method of the [[TestingJobManager]].
+    *
+    * Only the leaderElectionService is stopped in the postStop method call to revoke the leadership
+    */
+  case object DisablePostStop
+
+  /**
+    * Requests a savepoint from the job manager.
+    *
+    * @param savepointPath The path of the savepoint to request.
+    */
+  case class RequestSavepoint(savepointPath: String)
+
+  /**
+    * Response to a savepoint request.
+    *
+    * @param savepoint The requested savepoint or null if none available.
+    */
+  case class ResponseSavepoint(savepoint: Savepoint)
+
+  def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader
+  def getNotifyWhenClientConnects(): AnyRef = NotifyWhenClientConnects
+  def getDisablePostStop(): AnyRef = DisablePostStop
+
+  def getClientConnected(): AnyRef = ClientConnected
+  def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
new file mode 100644
index 0000000..48a1ddd
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import org.apache.flink.runtime.jobmanager.MemoryArchivist
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphFound, ExecutionGraphNotFound, RequestExecutionGraph}
+
+/** Memory archivist extended by testing messages
+  *
+  * @param maxEntries number of maximum number of archived jobs
+  */
+class TestingMemoryArchivist(maxEntries: Int) extends MemoryArchivist(maxEntries) {
+
+  override def handleMessage: Receive = {
+    handleTestingMessage orElse super.handleMessage
+  }
+
+  def handleTestingMessage: Receive = {
+    case RequestExecutionGraph(jobID) =>
+      val executionGraph = graphs.get(jobID)
+      
+      executionGraph match {
+        case Some(graph) => sender ! decorateMessage(ExecutionGraphFound(jobID, graph))
+        case None => sender ! decorateMessage(ExecutionGraphNotFound(jobID))
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
new file mode 100644
index 0000000..91d169a
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
+
+object TestingMessages {
+
+  case class CheckIfJobRemoved(jobID: JobID)
+
+  case object DisableDisconnect
+
+  case object Alive
+
+  def getAlive: AnyRef = Alive
+
+  def getDisableDisconnect: AnyRef = DisableDisconnect
+
+  case object NotifyOfComponentShutdown
+  case class ComponentShutdown(ref: ActorRef)
+
+  def getNotifyOfComponentShutdown(): AnyRef = NotifyOfComponentShutdown
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
new file mode 100644
index 0000000..9b5a147
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.io.disk.iomanager.IOManager
+import org.apache.flink.runtime.io.network.NetworkEnvironment
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
+import org.apache.flink.runtime.memory.MemoryManager
+import org.apache.flink.runtime.taskmanager.{TaskManagerLocation, TaskManager, TaskManagerConfiguration}
+
+import scala.language.postfixOps
+
+/** Subclass of the [[TaskManager]] to support testing messages
+ */
+class TestingTaskManager(
+                          config: TaskManagerConfiguration,
+                          resourceID: ResourceID,
+                          connectionInfo: TaskManagerLocation,
+                          memoryManager: MemoryManager,
+                          ioManager: IOManager,
+                          network: NetworkEnvironment,
+                          numberOfSlots: Int,
+                          leaderRetrievalService: LeaderRetrievalService)
+  extends TaskManager(
+    config,
+    resourceID,
+    connectionInfo,
+    memoryManager,
+    ioManager,
+    network,
+    numberOfSlots,
+    leaderRetrievalService)
+  with TestingTaskManagerLike {
+
+  def this(
+            config: TaskManagerConfiguration,
+            connectionInfo: TaskManagerLocation,
+            memoryManager: MemoryManager,
+            ioManager: IOManager,
+            network: NetworkEnvironment,
+            numberOfSlots: Int,
+            leaderRetrievalService: LeaderRetrievalService) {
+    this(
+      config,
+      ResourceID.generate(),
+      connectionInfo,
+      memoryManager,
+      ioManager,
+      network,
+      numberOfSlots,
+      leaderRetrievalService)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
new file mode 100644
index 0000000..2498dbe
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.{ActorRef, Terminated}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.messages.JobManagerMessages.{RequestLeaderSessionID, ResponseLeaderSessionID}
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered}
+import org.apache.flink.runtime.messages.TaskMessages.{SubmitTask, TaskInFinalState, UpdateTaskExecutionState}
+import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
+import org.apache.flink.runtime.testingUtils.TestingMessages._
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+/** This mixin can be used to decorate a TaskManager with messages for testing purposes. */
+trait TestingTaskManagerLike extends FlinkActor {
+  that: TaskManager =>
+
+  import scala.collection.JavaConverters._
+
+  val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
+  val waitForJobManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
+  val waitForRegisteredAtResourceManager =
+    scala.collection.mutable.HashMap[ActorRef, Set[ActorRef]]()
+  val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
+  val unregisteredTasks = scala.collection.mutable.HashSet[ExecutionAttemptID]()
+
+  /** Map of registered task submit listeners */
+  val registeredSubmitTaskListeners = scala.collection.mutable.HashMap[JobID, ActorRef]()
+
+  val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
+
+  var disconnectDisabled = false
+
+  /**
+   * Handler for testing related messages
+   */
+  abstract override def handleMessage: Receive = {
+    handleTestingMessage orElse super.handleMessage
+  }
+
+  def handleTestingMessage: Receive = {
+    case Alive => sender() ! Acknowledge
+
+    case NotifyWhenTaskIsRunning(executionID) =>
+      Option(runningTasks.get(executionID)) match {
+        case Some(task) if task.getExecutionState == ExecutionState.RUNNING =>
+          sender ! decorateMessage(true)
+
+        case _ =>
+          val listeners = waitForRunning.getOrElse(executionID, Set())
+          waitForRunning += (executionID -> (listeners + sender))
+      }
+
+    case RequestRunningTasks =>
+      sender ! decorateMessage(ResponseRunningTasks(runningTasks.asScala.toMap))
+
+    case NotifyWhenTaskRemoved(executionID) =>
+      Option(runningTasks.get(executionID)) match {
+        case Some(_) =>
+          val set = waitForRemoval.getOrElse(executionID, Set())
+          waitForRemoval += (executionID -> (set + sender))
+        case None =>
+          if(unregisteredTasks.contains(executionID)) {
+            sender ! decorateMessage(true)
+          } else {
+            val set = waitForRemoval.getOrElse(executionID, Set())
+            waitForRemoval += (executionID -> (set + sender))
+          }
+      }
+
+    case TaskInFinalState(executionID) =>
+      super.handleMessage(TaskInFinalState(executionID))
+      waitForRemoval.remove(executionID) match {
+        case Some(actors) => for(actor <- actors) actor ! decorateMessage(true)
+        case None =>
+      }
+
+      unregisteredTasks += executionID
+
+    case NotifyWhenJobRemoved(jobID) =>
+      if(runningTasks.values.asScala.exists(_.getJobID == jobID)){
+        context.system.scheduler.scheduleOnce(
+          200 milliseconds,
+          self,
+          decorateMessage(CheckIfJobRemoved(jobID)))(
+            context.dispatcher,
+            sender()
+          )
+      }else{
+        sender ! decorateMessage(true)
+      }
+
+    case CheckIfJobRemoved(jobID) =>
+      if(runningTasks.values.asScala.forall(_.getJobID != jobID)){
+        sender ! decorateMessage(true)
+      } else {
+        context.system.scheduler.scheduleOnce(
+          200 milliseconds,
+          self,
+          decorateMessage(CheckIfJobRemoved(jobID)))(
+            context.dispatcher,
+            sender()
+          )
+      }
+
+    case NotifyWhenJobManagerTerminated(jobManager) =>
+      val waiting = waitForJobManagerToBeTerminated.getOrElse(jobManager.path.name, Set())
+      waitForJobManagerToBeTerminated += jobManager.path.name -> (waiting + sender)
+
+    case RegisterSubmitTaskListener(jobId) =>
+      registeredSubmitTaskListeners.put(jobId, sender())
+
+    case msg@SubmitTask(tdd) =>
+      registeredSubmitTaskListeners.get(tdd.getJobID) match {
+        case Some(listenerRef) =>
+          listenerRef ! ResponseSubmitTaskListener(tdd)
+        case None =>
+        // Nothing to do
+      }
+
+      super.handleMessage(msg)
+
+    /**
+     * Message from task manager that accumulator values changed and need to be reported immediately
+     * instead of lazily through the
+     * [[org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat]] message. We forward this
+     * message to the job manager that it knows it should report to the listeners.
+     */
+    case msg: AccumulatorsChanged =>
+      currentJobManager match {
+        case Some(jobManager) =>
+          jobManager.forward(msg)
+          sendHeartbeatToJobManager()
+          sender ! true
+        case None =>
+      }
+
+    case msg@Terminated(jobManager) =>
+      super.handleMessage(msg)
+
+      waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
+        _ foreach {
+          _ ! decorateMessage(JobManagerTerminated(jobManager))
+        }
+      }
+
+    case msg:Disconnect =>
+      if (!disconnectDisabled) {
+        super.handleMessage(msg)
+
+        val jobManager = sender()
+
+        waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
+          _ foreach {
+            _ ! decorateMessage(JobManagerTerminated(jobManager))
+          }
+        }
+      }
+
+    case DisableDisconnect =>
+      disconnectDisabled = true
+
+    case NotifyOfComponentShutdown =>
+      waitForShutdown += sender()
+
+    case msg @ UpdateTaskExecutionState(taskExecutionState) =>
+      super.handleMessage(msg)
+
+      if(taskExecutionState.getExecutionState == ExecutionState.RUNNING) {
+        waitForRunning.get(taskExecutionState.getID) foreach {
+          _ foreach (_ ! decorateMessage(true))
+        }
+      }
+
+    case RequestLeaderSessionID =>
+      sender() ! ResponseLeaderSessionID(leaderSessionID.orNull)
+
+    case NotifyWhenRegisteredAtJobManager(jobManager: ActorRef) =>
+      if(isConnected && jobManager == currentJobManager.get) {
+        sender() ! true
+      } else {
+        val list = waitForRegisteredAtResourceManager.getOrElse(
+          jobManager,
+          Set[ActorRef]())
+
+        waitForRegisteredAtResourceManager += jobManager -> (list + sender())
+      }
+
+    case msg @ (_: AcknowledgeRegistration | _: AlreadyRegistered) =>
+      super.handleMessage(msg)
+
+      val jm = sender()
+
+      waitForRegisteredAtResourceManager.remove(jm).foreach {
+        listeners => listeners.foreach{
+          listener =>
+            listener ! true
+        }
+      }
+  }
+
+  /**
+    * No killing of the VM for testing.
+    */
+  override protected def shutdown(): Unit = {
+    log.info("Shutting down TestingJobManager.")
+    waitForShutdown.foreach(_ ! ComponentShutdown(self))
+    waitForShutdown.clear()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
new file mode 100644
index 0000000..32c3c55
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.taskmanager.Task
+
+/**
+ * Additional messages that the [[TestingTaskManager]] understands.
+ */
+object TestingTaskManagerMessages {
+  
+  case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID)
+
+  case class NotifyWhenTaskIsRunning(executionID: ExecutionAttemptID)
+  
+  case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){
+    import collection.JavaConverters._
+    def asJava: java.util.Map[ExecutionAttemptID, Task] = tasks.asJava
+  }
+  
+  case object RequestRunningTasks
+
+  case class NotifyWhenJobManagerTerminated(jobManager: ActorRef)
+
+  case class JobManagerTerminated(jobManager: ActorRef)
+
+  case class NotifyWhenRegisteredAtJobManager(resourceManager: ActorRef)
+
+  /**
+   * Message to give a hint to the task manager that accumulator values were updated in the task.
+   * This message is forwarded to the job manager which knows that it needs to notify listeners
+   * of accumulator updates.
+   */
+  case class AccumulatorsChanged(jobID: JobID)
+
+  /**
+    * Registers a listener for all [[org.apache.flink.runtime.messages.TaskMessages.SubmitTask]]
+    * messages of the given job.
+    *
+    * If a task is submitted with the given job ID the task deployment
+    * descriptor is forwarded to the listener.
+    *
+    * @param jobId The job ID to listen for.
+    */
+  case class RegisterSubmitTaskListener(jobId: JobID)
+
+  /**
+    * A response to a listened job ID containing the submitted task deployment descriptor.
+    *
+    * @param tdd The submitted task deployment descriptor.
+    */
+  case class ResponseSubmitTaskListener(tdd: TaskDeploymentDescriptor)
+
+  // --------------------------------------------------------------------------
+  // Utility methods to allow simpler case object access from Java
+  // --------------------------------------------------------------------------
+
+  def getRequestRunningTasksMessage: AnyRef = {
+    RequestRunningTasks
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index ee1b264..00410cc 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -22,9 +22,10 @@ import java.io._
 import java.util.concurrent.TimeUnit
 
 import org.apache.flink.configuration.GlobalConfiguration
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.util.TestLogger
-import org.junit.{AfterClass, BeforeClass, Test, Assert}
+import org.junit.{AfterClass, Assert, BeforeClass, Test}
 
 import scala.concurrent.duration.FiniteDuration
 import scala.tools.nsc.Settings
@@ -334,7 +335,7 @@ class ScalaShellITCase extends TestLogger {
 }
 
 object ScalaShellITCase {
-  var cluster: Option[ForkableFlinkMiniCluster] = None
+  var cluster: Option[LocalFlinkMiniCluster] = None
   val parallelism = 4
 
   @BeforeClass

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 8bb440c..f94ff68 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -40,6 +40,7 @@ import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -49,7 +50,6 @@ import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
 
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.TestEnvironment;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -134,7 +134,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 		}
 	}
 
-	private static ForkableFlinkMiniCluster flinkCluster;
+	private static LocalFlinkMiniCluster flinkCluster;
 
 	// ------------------------------------------------------------------------
 	//  Cluster Setup (Cassandra & Flink)
@@ -205,7 +205,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
 
-		flinkCluster = new ForkableFlinkMiniCluster(config);
+		flinkCluster = new LocalFlinkMiniCluster(config);
 		flinkCluster.start();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 9e3c33b..c4949ff 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -30,7 +31,6 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.junit.AfterClass;
@@ -58,7 +58,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 	
 	private static KafkaTestEnvironment kafkaServer;
 	private static Properties standardProps;
-	private static ForkableFlinkMiniCluster flink;
+	private static LocalFlinkMiniCluster flink;
 
 	@BeforeClass
 	public static void prepare() throws IOException, ClassNotFoundException {
@@ -88,7 +88,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 
-		flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+		flink = new LocalFlinkMiniCluster(flinkConfig, false);
 		flink.start();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index eddb57c..771db17 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -22,8 +22,8 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
@@ -65,7 +65,7 @@ public abstract class KafkaTestBase extends TestLogger {
 
 	protected static Properties standardProps;
 	
-	protected static ForkableFlinkMiniCluster flink;
+	protected static LocalFlinkMiniCluster flink;
 
 	protected static int flinkPort;
 
@@ -105,7 +105,7 @@ public abstract class KafkaTestBase extends TestLogger {
 		flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
 		flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 
-		flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+		flink = new LocalFlinkMiniCluster(flinkConfig, false);
 		flink.start();
 
 		flinkPort = flink.getLeaderRPCPort();

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
index 3705943..2e452c1 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
@@ -21,11 +21,11 @@ import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisEventsGeneratorProducerThread;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,7 +80,7 @@ public class ManualExactlyOnceTest {
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 
-		ForkableFlinkMiniCluster flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+		LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false);
 		flink.start();
 
 		final int flinkPort = flink.getLeaderRPCPort();


[2/4] flink git commit: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFlinkMiniCluster

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index 934a795..6abea2a 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -27,11 +27,11 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,7 +92,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 
-		ForkableFlinkMiniCluster flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+		LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false);
 		flink.start();
 
 		final int flinkPort = flink.getLeaderRPCPort();

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
index ee415d1..29b3a3e 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
@@ -18,8 +18,9 @@
 
 package org.apache.flink.streaming.api.scala
 
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.streaming.util.TestStreamEnvironment
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
+import org.apache.flink.test.util.TestBaseUtils
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.junit.JUnitSuiteLike
 
@@ -29,7 +30,7 @@ trait ScalaStreamingMultipleProgramsTestBase
   with BeforeAndAfterAll {
 
   val parallelism = 4
-  var cluster: Option[ForkableFlinkMiniCluster] = None
+  var cluster: Option[LocalFlinkMiniCluster] = None
 
   override protected def beforeAll(): Unit = {
     val cluster = Some(

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml b/flink-test-utils-parent/flink-test-utils/pom.xml
index 2ab52b5..18ecfde 100644
--- a/flink-test-utils-parent/flink-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-test-utils/pom.xml
@@ -79,153 +79,4 @@ under the License.
 		</dependency>
 
 	</dependencies>
-
-	<build>
-		<plugins>
-			<!-- Scala Compiler -->
-			<plugin>
-				<groupId>net.alchim31.maven</groupId>
-				<artifactId>scala-maven-plugin</artifactId>
-				<version>3.1.4</version>
-				<executions>
-					<!-- Run scala compiler in the process-resources phase, so that dependencies 
-						on scala classes can be resolved later in the (Java) compile phase -->
-					<execution>
-						<id>scala-compile-first</id>
-						<phase>process-resources</phase>
-						<goals>
-							<goal>compile</goal>
-						</goals>
-					</execution>
-
-					<!-- Run scala compiler in the process-test-resources phase, so that 
-						dependencies on scala classes can be resolved later in the (Java) test-compile 
-						phase -->
-					<execution>
-						<id>scala-test-compile</id>
-						<phase>process-test-resources</phase>
-						<goals>
-							<goal>testCompile</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<jvmArgs>
-						<jvmArg>-Xms128m</jvmArg>
-						<jvmArg>-Xmx512m</jvmArg>
-					</jvmArgs>
-				</configuration>
-			</plugin>
-
-			<!-- Eclipse Integration -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-eclipse-plugin</artifactId>
-				<version>2.8</version>
-				<configuration>
-					<downloadSources>true</downloadSources>
-					<projectnatures>
-						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
-					</projectnatures>
-					<buildcommands>
-						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-					</buildcommands>
-					<classpathContainers>
-						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-					</classpathContainers>
-					<excludes>
-						<exclude>org.scala-lang:scala-library</exclude>
-						<exclude>org.scala-lang:scala-compiler</exclude>
-					</excludes>
-					<sourceIncludes>
-						<sourceInclude>**/*.scala</sourceInclude>
-						<sourceInclude>**/*.java</sourceInclude>
-					</sourceIncludes>
-				</configuration>
-			</plugin>
-
-			<!-- Adding scala source directories to build path -->
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>build-helper-maven-plugin</artifactId>
-				<version>1.7</version>
-				<executions>
-					<!-- Add src/main/scala to eclipse build path -->
-					<execution>
-						<id>add-source</id>
-						<phase>generate-sources</phase>
-						<goals>
-							<goal>add-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/main/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-					<!-- Add src/test/scala to eclipse build path -->
-					<execution>
-						<id>add-test-source</id>
-						<phase>generate-test-sources</phase>
-						<goals>
-							<goal>add-test-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/test/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<!-- Scala Code Style, most of the configuration done via plugin management -->
-			<plugin>
-				<groupId>org.scalastyle</groupId>
-				<artifactId>scalastyle-maven-plugin</artifactId>
-				<configuration>
-					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
-				</configuration>
-			</plugin>
-
-		</plugins>
-		<pluginManagement>
-			<plugins>
-				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
-				<plugin>
-					<groupId>org.eclipse.m2e</groupId>
-					<artifactId>lifecycle-mapping</artifactId>
-					<version>1.0.0</version>
-					<configuration>
-						<lifecycleMappingMetadata>
-							<pluginExecutions>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>
-											net.alchim31.maven
-										</groupId>
-										<artifactId>
-											scala-maven-plugin
-										</artifactId>
-										<versionRange>
-											[3.1.4,)
-										</versionRange>
-										<goals>
-											<goal>compile</goal>
-											<goal>testCompile</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore/>
-									</action>
-								</pluginExecution>
-							</pluginExecutions>
-						</lifecycleMappingMetadata>
-					</configuration>
-				</plugin>
-			</plugins>
-		</pluginManagement>
-	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index c5fbaf0..a478908 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -19,8 +19,8 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.TestBaseUtils;
 
 import org.junit.AfterClass;
@@ -61,7 +61,7 @@ public class StreamingMultipleProgramsTestBase extends AbstractTestBase {
 
 	protected static final int DEFAULT_PARALLELISM = 4;
 
-	protected static ForkableFlinkMiniCluster cluster;
+	protected static LocalFlinkMiniCluster cluster;
 
 	public StreamingMultipleProgramsTestBase() {
 		super(new Configuration());

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index c700102..64c68dc 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -20,10 +20,10 @@ package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
 import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -32,10 +32,10 @@ import org.apache.flink.util.Preconditions;
 public class TestStreamEnvironment extends StreamExecutionEnvironment {
 	
 	/** The mini cluster in which this environment executes its jobs */
-	private ForkableFlinkMiniCluster executor;
+	private LocalFlinkMiniCluster executor;
 	
 
-	public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
+	public TestStreamEnvironment(LocalFlinkMiniCluster executor, int parallelism) {
 		this.executor = Preconditions.checkNotNull(executor);
 		setParallelism(parallelism);
 	}
@@ -57,7 +57,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 	 * @param cluster The test cluster to run the test program on.
 	 * @param parallelism The default parallelism for the test programs.
 	 */
-	public static void setAsContext(final ForkableFlinkMiniCluster cluster, final int parallelism) {
+	public static void setAsContext(final LocalFlinkMiniCluster cluster, final int parallelism) {
 		
 		StreamExecutionEnvironmentFactory factory = new StreamExecutionEnvironmentFactory() {
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index c2da691..316fd21 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.util;
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
@@ -48,7 +49,7 @@ public abstract class AbstractTestBase extends TestBaseUtils {
 	protected int numTaskManagers = 1;
 	
 	/** The mini cluster that runs the test programs */
-	protected ForkableFlinkMiniCluster executor;
+	protected LocalFlinkMiniCluster executor;
 	
 
 	public AbstractTestBase(Configuration config) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index d7f09bd..4e83245 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.util;
 
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.runners.Parameterized;
@@ -72,7 +73,7 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
 
 	protected static boolean startWebServer = false;
 
-	protected static ForkableFlinkMiniCluster cluster = null;
+	protected static LocalFlinkMiniCluster cluster = null;
 	
 	// ------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 4014b80..b774f97 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -32,7 +32,8 @@ import org.apache.commons.io.IOUtils;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -104,7 +105,7 @@ public class TestBaseUtils extends TestLogger {
 	}
 	
 	
-	public static ForkableFlinkMiniCluster startCluster(
+	public static LocalFlinkMiniCluster startCluster(
 		int numTaskManagers,
 		int taskManagerNumSlots,
 		boolean startWebserver,
@@ -126,7 +127,7 @@ public class TestBaseUtils extends TestLogger {
 		return startCluster(config, singleActorSystem);
 	}
 
-	public static ForkableFlinkMiniCluster startCluster(
+	public static LocalFlinkMiniCluster startCluster(
 		Configuration config,
 		boolean singleActorSystem) throws Exception {
 
@@ -147,7 +148,7 @@ public class TestBaseUtils extends TestLogger {
 		
 		config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.toString());
 
-		ForkableFlinkMiniCluster cluster =  new ForkableFlinkMiniCluster(config, singleActorSystem);
+		LocalFlinkMiniCluster cluster =  new LocalFlinkMiniCluster(config, singleActorSystem);
 
 		cluster.start();
 
@@ -155,7 +156,7 @@ public class TestBaseUtils extends TestLogger {
 	}
 
 
-	public static void stopCluster(ForkableFlinkMiniCluster executor, FiniteDuration timeout) throws Exception {
+	public static void stopCluster(LocalFlinkMiniCluster executor, FiniteDuration timeout) throws Exception {
 		if (logDir != null) {
 			FileUtils.deleteDirectory(logDir);
 		}
@@ -169,11 +170,15 @@ public class TestBaseUtils extends TestLogger {
 				List<Future<Object>> numActiveConnectionsResponseFutures = new ArrayList<>();
 
 				for (ActorRef tm : tms) {
-					bcVariableManagerResponseFutures.add(Patterns.ask(tm, TestingTaskManagerMessages
-							.RequestBroadcastVariablesWithReferences$.MODULE$, new Timeout(timeout)));
-
-					numActiveConnectionsResponseFutures.add(Patterns.ask(tm, TestingTaskManagerMessages
-							.RequestNumActiveConnections$.MODULE$, new Timeout(timeout)));
+					bcVariableManagerResponseFutures.add(Patterns.ask(
+						tm,
+						TaskManagerMessages.getRequestBroadcastVariablesWithReferences(),
+						new Timeout(timeout)));
+
+					numActiveConnectionsResponseFutures.add(Patterns.ask(
+						tm,
+						TaskManagerMessages.getRequestNumActiveConnections(),
+						new Timeout(timeout)));
 				}
 
 				Future<Iterable<Object>> bcVariableManagerFutureResponses = Futures.sequence(
@@ -182,8 +187,7 @@ public class TestBaseUtils extends TestLogger {
 				Iterable<Object> responses = Await.result(bcVariableManagerFutureResponses, timeout);
 
 				for (Object response : responses) {
-					numUnreleasedBCVars += ((TestingTaskManagerMessages
-							.ResponseBroadcastVariablesWithReferences) response).number();
+					numUnreleasedBCVars += ((TaskManagerMessages.ResponseBroadcastVariablesWithReferences) response).number();
 				}
 
 				Future<Iterable<Object>> numActiveConnectionsFutureResponses = Futures.sequence(
@@ -192,8 +196,7 @@ public class TestBaseUtils extends TestLogger {
 				responses = Await.result(numActiveConnectionsFutureResponses, timeout);
 
 				for (Object response : responses) {
-					numActiveConnections += ((TestingTaskManagerMessages
-							.ResponseNumActiveConnections) response).number();
+					numActiveConnections += ((TaskManagerMessages.ResponseNumActiveConnections) response).number();
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index 7cb88be..aea8152 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -29,10 +29,11 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
 public class TestEnvironment extends ExecutionEnvironment {
 
-	private final ForkableFlinkMiniCluster executor;
+	private final LocalFlinkMiniCluster executor;
 
 	private TestEnvironment lastEnv = null;
 
@@ -46,7 +47,7 @@ public class TestEnvironment extends ExecutionEnvironment {
 		}
 	}
 
-	public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
+	public TestEnvironment(LocalFlinkMiniCluster executor, int parallelism) {
 		this.executor = executor;
 		setParallelism(parallelism);
 
@@ -54,7 +55,7 @@ public class TestEnvironment extends ExecutionEnvironment {
 		getConfig().setCodeAnalysisMode(CodeAnalysisMode.DISABLE);
 	}
 
-	public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism, boolean isObjectReuseEnabled) {
+	public TestEnvironment(LocalFlinkMiniCluster executor, int parallelism, boolean isObjectReuseEnabled) {
 		this(executor, parallelism);
 
 		if (isObjectReuseEnabled) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
deleted file mode 100644
index fa3135a..0000000
--- a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util
-
-import java.util.concurrent.TimeoutException
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.pattern.Patterns._
-import akka.pattern.ask
-
-import org.apache.curator.test.TestingCluster
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.jobmanager.{JobManager, HighAvailabilityMode}
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
-import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.testingUtils.{TestingJobManager, TestingMemoryArchivist, TestingTaskManager}
-import org.apache.flink.runtime.testutils.TestingResourceManager
-
-import scala.concurrent.{Await, Future}
-import scala.concurrent.duration._
-
-/**
- * A forkable mini cluster is a special case of the mini cluster, used for parallel test execution
- * on build servers. If multiple tests run in parallel, the cluster picks up the fork number and
- * uses it to avoid port conflicts.
- *
- * @param userConfiguration Configuration object with the user provided configuration values
- * @param singleActorSystem true, if all actors (JobManager and TaskManager) shall be run in the
- *                          same [[ActorSystem]], otherwise false.
- */
-class ForkableFlinkMiniCluster(
-    userConfiguration: Configuration,
-    singleActorSystem: Boolean)
-  extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) {
-
-  def this(userConfiguration: Configuration) = this(userConfiguration, true)
-
-  // --------------------------------------------------------------------------
-
-  var zookeeperCluster: Option[TestingCluster] = None
-
-  override def generateConfiguration(userConfiguration: Configuration): Configuration = {
-    val forkNumberString = System.getProperty("forkNumber")
-
-    val forkNumber = try {
-      Integer.parseInt(forkNumberString)
-    }
-    catch {
-      case e: NumberFormatException => -1
-    }
-
-    val config = userConfiguration.clone()
-
-    if (forkNumber != -1) {
-      val jobManagerRPC = 1024 + forkNumber*400
-      val taskManagerRPC = 1024 + forkNumber*400 + 100
-      val taskManagerData = 1024 + forkNumber*400 + 200
-      val resourceManagerRPC = 1024 + forkNumber*400 + 300
-
-      config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRPC)
-      config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, taskManagerRPC)
-      config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, taskManagerData)
-      config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerRPC)
-    }
-
-    super.generateConfiguration(config)
-  }
-
-  override def startJobManager(index: Int, actorSystem: ActorSystem): ActorRef = {
-    val config = configuration.clone()
-
-    val jobManagerName = getJobManagerName(index)
-    val archiveName = getArchiveName(index)
-
-    val jobManagerPort = config.getInteger(
-      ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
-
-    if (jobManagerPort > 0) {
-      config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
-    }
-
-    val (jobManager, _) = JobManager.startJobManagerActors(
-      config,
-      actorSystem,
-      Some(jobManagerName),
-      Some(archiveName),
-      classOf[TestingJobManager],
-      classOf[TestingMemoryArchivist])
-
-    jobManager
-  }
-
-  override def startResourceManager(index: Int, system: ActorSystem): ActorRef = {
-    val config = configuration.clone()
-
-    val resourceManagerName = getResourceManagerName(index)
-
-    val resourceManagerPort = config.getInteger(
-      ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
-
-    if (resourceManagerPort > 0) {
-      config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index)
-    }
-
-    val resourceManager = FlinkResourceManager.startResourceManagerActors(
-      config,
-      system,
-      createLeaderRetrievalService(),
-      classOf[TestingResourceManager],
-      resourceManagerName)
-
-    resourceManager
-  }
-
-  override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
-    val config = configuration.clone()
-
-    val rpcPort = config.getInteger(
-      ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
-
-    val dataPort = config.getInteger(
-      ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
-
-    if (rpcPort > 0) {
-      config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index)
-    }
-    if (dataPort > 0) {
-      config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index)
-    }
-
-    val localExecution = numTaskManagers == 1
-
-    TaskManager.startTaskManagerComponentsAndActor(
-      config,
-      ResourceID.generate(),
-      system,
-      hostname,
-      Some(TaskManager.TASK_MANAGER_NAME + index),
-      Some(createLeaderRetrievalService()),
-      localExecution,
-      classOf[TestingTaskManager])
-  }
-
-  def addTaskManager(): Unit = {
-    if (useSingleActorSystem) {
-      (jobManagerActorSystems, taskManagerActors) match {
-        case (Some(jmSystems), Some(tmActors)) =>
-          val index = numTaskManagers
-          taskManagerActors = Some(tmActors :+ startTaskManager(index, jmSystems(0)))
-          numTaskManagers += 1
-        case _ => throw new IllegalStateException("Cluster has not been started properly.")
-      }
-    } else {
-      (taskManagerActorSystems, taskManagerActors) match {
-        case (Some(tmSystems), Some(tmActors)) =>
-          val index = numTaskManagers
-          val newTmSystem = startTaskManagerActorSystem(index)
-          val newTmActor = startTaskManager(index, newTmSystem)
-
-          taskManagerActorSystems = Some(tmSystems :+ newTmSystem)
-          taskManagerActors = Some(tmActors :+ newTmActor)
-
-          numTaskManagers += 1
-        case _ => throw new IllegalStateException("Cluster has not been started properly.")
-      }
-    }
-  }
-
-  def restartLeadingJobManager(): Unit = {
-    this.synchronized {
-      (jobManagerActorSystems, jobManagerActors) match {
-        case (Some(jmActorSystems), Some(jmActors)) =>
-          val leader = getLeaderGateway(AkkaUtils.getTimeout(configuration))
-          val index = getLeaderIndex(AkkaUtils.getTimeout(configuration))
-
-          clearLeader()
-
-          val stopped = gracefulStop(leader.actor(), ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
-          Await.result(stopped, ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
-
-          if(!singleActorSystem) {
-            jmActorSystems(index).shutdown()
-            jmActorSystems(index).awaitTermination()
-          }
-
-          val newJobManagerActorSystem = if(!singleActorSystem) {
-            startJobManagerActorSystem(index)
-          } else {
-            jmActorSystems.head
-          }
-
-          val newJobManagerActor = startJobManager(index, newJobManagerActorSystem)
-
-          jobManagerActors = Some(jmActors.patch(index, Seq(newJobManagerActor), 1))
-          jobManagerActorSystems = Some(jmActorSystems.patch(
-            index,
-            Seq(newJobManagerActorSystem),
-            1))
-
-          val lrs = createLeaderRetrievalService()
-
-          jobManagerLeaderRetrievalService = Some(lrs)
-          lrs.start(this)
-
-        case _ => throw new Exception("The JobManager of the ForkableFlinkMiniCluster have not " +
-          "been started properly.")
-      }
-    }
-  }
-
-
-  def restartTaskManager(index: Int): Unit = {
-    (taskManagerActorSystems, taskManagerActors) match {
-      case (Some(tmActorSystems), Some(tmActors)) =>
-        val stopped = gracefulStop(tmActors(index), ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
-        Await.result(stopped, ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
-
-        if(!singleActorSystem) {
-          tmActorSystems(index).shutdown()
-          tmActorSystems(index).awaitTermination()
-        }
-
-        val taskManagerActorSystem  = if(!singleActorSystem) {
-          startTaskManagerActorSystem(index)
-        } else {
-          tmActorSystems.head
-        }
-
-        val taskManagerActor = startTaskManager(index, taskManagerActorSystem)
-
-        taskManagerActors = Some(tmActors.patch(index, Seq(taskManagerActor), 1))
-        taskManagerActorSystems = Some(tmActorSystems.patch(index, Seq(taskManagerActorSystem), 1))
-
-      case _ => throw new Exception("The TaskManager of the ForkableFlinkMiniCluster have not " +
-        "been started properly.")
-    }
-  }
-
-  override def start(): Unit = {
-    val zookeeperURL = configuration.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "")
-
-    zookeeperCluster = if (haMode == HighAvailabilityMode.ZOOKEEPER &&
-      zookeeperURL.equals("")) {
-      LOG.info("Starting ZooKeeper cluster.")
-
-      val testingCluster = new TestingCluster(1)
-
-      configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
-        testingCluster.getConnectString)
-
-      testingCluster.start()
-
-      Some(testingCluster)
-    } else {
-      None
-    }
-
-    super.start()
-  }
-
-  override def stop(): Unit = {
-    super.stop()
-
-    zookeeperCluster.foreach{
-      LOG.info("Stopping ZooKeeper cluster.")
-      _.close()
-    }
-  }
-
-  def waitForTaskManagersToBeRegisteredAtJobManager(jobManager: ActorRef): Unit = {
-    val futures = taskManagerActors.map {
-      _.map {
-        tm => (tm ? NotifyWhenRegisteredAtJobManager(jobManager))(timeout)
-      }
-    }.getOrElse(Seq())
-
-    try {
-      Await.ready(Future.sequence(futures), timeout)
-    } catch {
-      case t: TimeoutException =>
-        throw new Exception("Timeout while waiting for TaskManagers to register at " +
-          s"${jobManager.path}")
-    }
-
-  }
-}
-
-object ForkableFlinkMiniCluster {
-
-  val MAX_RESTART_DURATION = 2 minute
-
-  val DEFAULT_MINICLUSTER_AKKA_ASK_TIMEOUT = "200 s"
-
-  def startCluster(
-                    numSlots: Int,
-                    numTaskManagers: Int,
-                    timeout: String = DEFAULT_MINICLUSTER_AKKA_ASK_TIMEOUT)
-  : ForkableFlinkMiniCluster = {
-
-    val config = new Configuration()
-    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
-    config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers)
-    config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout)
-
-    val cluster = new ForkableFlinkMiniCluster(config)
-
-    cluster.start()
-
-    cluster
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
index cac8451..cc70fee 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -28,7 +28,7 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -44,7 +44,7 @@ import static org.junit.Assert.fail;
  */
 public class AccumulatorErrorITCase {
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 	@BeforeClass
 	public static void startCluster() {
@@ -53,7 +53,7 @@ public class AccumulatorErrorITCase {
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 
 			cluster.start();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 49e18e0..624bfff 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -234,7 +234,6 @@ public class AccumulatorLiveITCase {
 				fail("Wrong accumulator results when map task begins execution.");
 			}
 
-
 			int expectedAccVal = 0;
 
 			/* for mapper task */

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 9671fce..8a08f15 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -22,8 +22,8 @@ package org.apache.flink.test.cancelling;
 import java.util.concurrent.TimeUnit;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 
@@ -39,7 +39,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import static org.apache.flink.runtime.taskmanager.TaskCancelTest.awaitRunning;
 import static org.apache.flink.runtime.taskmanager.TaskCancelTest.cancelJob;
 import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
-import org.apache.flink.util.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 
 import org.junit.After;
@@ -65,7 +64,7 @@ public abstract class CancelingTestBase extends TestLogger {
 
 	// --------------------------------------------------------------------------------------------
 	
-	protected ForkableFlinkMiniCluster executor;
+	protected LocalFlinkMiniCluster executor;
 
 	protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
 	
@@ -88,7 +87,7 @@ public abstract class CancelingTestBase extends TestLogger {
 		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048);
 
-		this.executor = new ForkableFlinkMiniCluster(config, false);
+		this.executor = new LocalFlinkMiniCluster(config, false);
 		this.executor.start();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 163fb42..94ff66f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
@@ -35,7 +36,6 @@ import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -60,7 +60,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 
 	private static final int PARALLELISM = 4;
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 
 	@BeforeClass
@@ -71,7 +71,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
 		config.setString(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT, "60 s");
 		config.setString(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, "60 s");
-		cluster = new ForkableFlinkMiniCluster(config, false);
+		cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index fa5339d..0aee128 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -41,7 +42,6 @@ import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -76,7 +76,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 	private static final int MAX_MEM_STATE_SIZE = 10 * 1024 * 1024;
 	private static final int PARALLELISM = 4;
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 	@Rule
 	public TemporaryFolder tempFolder = new TemporaryFolder();
@@ -95,7 +95,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
 
-		cluster = new ForkableFlinkMiniCluster(config, false);
+		cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 8915bff..7f1d7f3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -43,7 +44,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
@@ -73,7 +73,7 @@ public class RescalingITCase extends TestLogger {
 	private static int slotsPerTaskManager = 2;
 	private static int numSlots = numTaskManagers * slotsPerTaskManager;
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static TestingCluster cluster;
 
 	@ClassRule
 	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -92,7 +92,7 @@ public class RescalingITCase extends TestLogger {
 		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
 		config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString());
 
-		cluster = new ForkableFlinkMiniCluster(config);
+		cluster = new TestingCluster(config);
 		cluster.start();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 550ba75..7409fe7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -43,7 +43,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointListener;
@@ -51,6 +50,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestSavepoint;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseSavepoint;
@@ -62,8 +62,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.testutils.junit.RetryOnFailure;
 import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.TestLogger;
 import org.junit.Rule;
@@ -76,7 +74,6 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -137,7 +134,7 @@ public class SavepointITCase extends TestLogger {
 
 		LOG.info("Created temporary directory: " + tmpDir + ".");
 
-		ForkableFlinkMiniCluster flink = null;
+		TestingCluster flink = null;
 
 		try {
 			// Create a test actor system
@@ -168,7 +165,7 @@ public class SavepointITCase extends TestLogger {
 			LOG.info("Flink configuration: " + config + ".");
 
 			// Start Flink
-			flink = new ForkableFlinkMiniCluster(config);
+			flink = new TestingCluster(config);
 			LOG.info("Starting Flink cluster.");
 			flink.start();
 
@@ -261,7 +258,7 @@ public class SavepointITCase extends TestLogger {
 			LOG.info("JobManager: " + jobManager + ".");
 
 			final Throwable[] error = new Throwable[1];
-			final ForkableFlinkMiniCluster finalFlink = flink;
+			final TestingCluster finalFlink = flink;
 			final Multimap<JobVertexID, TaskDeploymentDescriptor> tdds = HashMultimap.create();
 			new JavaTestKit(testActorSystem) {{
 
@@ -422,7 +419,7 @@ public class SavepointITCase extends TestLogger {
 
 		LOG.info("Created temporary directory: " + tmpDir + ".");
 
-		ForkableFlinkMiniCluster flink = null;
+		TestingCluster flink = null;
 		List<File> checkpointFiles = new ArrayList<>();
 
 		try {
@@ -447,7 +444,7 @@ public class SavepointITCase extends TestLogger {
 			LOG.info("Flink configuration: " + config + ".");
 
 			// Start Flink
-			flink = new ForkableFlinkMiniCluster(config);
+			flink = new TestingCluster(config);
 			LOG.info("Starting Flink cluster.");
 			flink.start();
 
@@ -559,7 +556,7 @@ public class SavepointITCase extends TestLogger {
 		// Test deadline
 		final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
 
-		ForkableFlinkMiniCluster flink = null;
+		TestingCluster flink = null;
 
 		try {
 			// Flink configuration
@@ -570,7 +567,7 @@ public class SavepointITCase extends TestLogger {
 			LOG.info("Flink configuration: " + config + ".");
 
 			// Start Flink
-			flink = new ForkableFlinkMiniCluster(config);
+			flink = new TestingCluster(config);
 			LOG.info("Starting Flink cluster.");
 			flink.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index cf15052..6bf511f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -34,7 +35,6 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 
 import org.apache.flink.util.TestLogger;
@@ -80,7 +80,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
 	private static final int NUM_TASK_SLOTS = 3;
 	private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 	@BeforeClass
 	public static void startCluster() {
@@ -91,7 +91,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
 			config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 ms");
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
 
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 			cluster.start();
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 67c05e5..5f6cd4a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -20,8 +20,8 @@ package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
 import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.TestLogger;
@@ -43,7 +43,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 	protected static final int NUM_TASK_SLOTS = 4;
 	protected static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 	@BeforeClass
 	public static void startCluster() {
@@ -53,7 +53,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
 			
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 
 			cluster.start();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index 2e6ce78..e424a8d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
@@ -34,7 +35,6 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -71,7 +71,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 
 	private static final int PARALLELISM = 4;
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 
 	@BeforeClass
@@ -81,7 +81,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
 
-		cluster = new ForkableFlinkMiniCluster(config, false);
+		cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 8b56d3d..7afafe4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.classloading;
 
-import akka.pattern.AskTimeoutException;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -37,9 +36,9 @@ import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
 import org.apache.flink.test.testdata.KMeansData;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -82,7 +81,7 @@ public class ClassLoaderITCase extends TestLogger {
 
 	public static final TemporaryFolder FOLDER = new TemporaryFolder();
 
-	private static ForkableFlinkMiniCluster testCluster;
+	private static TestingCluster testCluster;
 
 	private static int parallelism;
 
@@ -105,7 +104,7 @@ public class ClassLoaderITCase extends TestLogger {
 		config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY,
 				FOLDER.newFolder().getAbsoluteFile().toURI().toString());
 
-		testCluster = new ForkableFlinkMiniCluster(config, false);
+		testCluster = new TestingCluster(config, false);
 		testCluster.start();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
index c9059f1..a74ed34 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
@@ -29,8 +29,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -42,7 +42,6 @@ import java.util.concurrent.Semaphore;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
-
 /**
  * Tests retrieval of a job from a running Flink cluster
  */
@@ -54,7 +53,7 @@ public class JobRetrievalITCase extends TestLogger {
 
 	@BeforeClass
 	public static void before() {
-		cluster = new ForkableFlinkMiniCluster(new Configuration(), false);
+		cluster = new TestingCluster(new Configuration(), false);
 		cluster.start();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
index 28c2e58..178656d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
@@ -26,8 +26,8 @@ import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -47,7 +47,7 @@ public class JobSubmissionFailsITCase {
 	
 	private static final int NUM_SLOTS = 20;
 	
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 	private static JobGraph workingJobGraph;
 
 	@BeforeClass
@@ -58,7 +58,7 @@ public class JobSubmissionFailsITCase {
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS / 2);
 			
-			cluster = new ForkableFlinkMiniCluster(config);
+			cluster = new LocalFlinkMiniCluster(config);
 
 			cluster.start();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
index ca2c156..133ebd0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
@@ -29,8 +29,8 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
@@ -52,7 +52,7 @@ public class CustomDistributionITCase extends TestLogger {
 	//  The mini cluster that is shared across tests
 	// ------------------------------------------------------------------------
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 	@BeforeClass
 	public static void setup() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
index 34a7eed..e18e82a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
@@ -23,11 +23,10 @@ import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -54,7 +53,7 @@ public class RemoteEnvironmentITCase {
 
 	private static final String VALID_STARTUP_TIMEOUT = "100 s";
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 	@BeforeClass
 	public static void setupCluster() {
@@ -62,7 +61,7 @@ public class RemoteEnvironmentITCase {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 			cluster.start();
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index 09b5e7e..a67e6ef 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -49,14 +49,14 @@ public class AutoParallelismITCase {
 	private static final int SLOTS_PER_TM = 7;
 	private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM;
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 	@BeforeClass
 	public static void setupCluster() {
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TM);
-		cluster = new ForkableFlinkMiniCluster(config, false);
+		cluster = new LocalFlinkMiniCluster(config, false);
 
 		cluster.start();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
index f30f61f..51f3534 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.types.Value;
 
 import org.junit.AfterClass;
@@ -43,7 +43,7 @@ public class CustomSerializationITCase {
 
 	private static final int PARLLELISM = 5;
 	
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 	@BeforeClass
 	public static void startCluster() {
@@ -51,7 +51,7 @@ public class CustomSerializationITCase {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARLLELISM);
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 30);
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 			cluster.start();
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
index 42419fb..06b93ea 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -29,7 +29,7 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
 import org.apache.flink.util.Collector;
 
@@ -52,7 +52,7 @@ import static org.junit.Assert.*;
 @SuppressWarnings("serial")
 public class MiscellaneousIssuesITCase {
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 	
 	@BeforeClass
 	public static void startCluster() {
@@ -61,7 +61,7 @@ public class MiscellaneousIssuesITCase {
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 
 			cluster.start();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index 12b7a68..a43bab6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -32,8 +32,8 @@ import org.apache.flink.examples.java.clustering.KMeans;
 import org.apache.flink.examples.java.clustering.util.KMeansData;
 import org.apache.flink.examples.java.graph.ConnectedComponents;
 import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -43,7 +43,7 @@ public class SuccessAfterNetworkBuffersFailureITCase {
 	
 	@Test
 	public void testSuccessfulProgramAfterFailure() {
-		ForkableFlinkMiniCluster cluster = null;
+		LocalFlinkMiniCluster cluster = null;
 		
 		try {
 			Configuration config = new Configuration();
@@ -52,7 +52,7 @@ public class SuccessAfterNetworkBuffersFailureITCase {
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 840);
 			
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 
 			cluster.start();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index 40732df..b99858a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -55,6 +55,7 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseRunningTasks;
@@ -62,7 +63,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.QueryableStateStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
@@ -108,7 +108,7 @@ public class QueryableStateITCase extends TestLogger {
 	 * Shared between all the test. Make sure to have at least NUM_SLOTS
 	 * available after your test finishes, e.g. cancel the job you submitted.
 	 */
-	private static ForkableFlinkMiniCluster cluster;
+	private static TestingCluster cluster;
 
 	@BeforeClass
 	public static void setup() {
@@ -120,7 +120,7 @@ public class QueryableStateITCase extends TestLogger {
 			config.setInteger(ConfigConstants.QUERYABLE_STATE_CLIENT_NETWORK_THREADS, 1);
 			config.setInteger(ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS, 1);
 
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new TestingCluster(config, false);
 			cluster.start(true);
 		} catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
index 8a45d62..8a43ee4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
@@ -24,11 +24,11 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -49,7 +49,7 @@ public class FastFailuresITCase extends TestLogger {
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
 		
-		ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, false);
+		LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();
 		
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
index 0c5d14b..a0d6b58 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
@@ -20,7 +20,7 @@ package org.apache.flink.test.recovery;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.BeforeClass;
 
 public class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCaseBase {
@@ -34,8 +34,8 @@ public class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCas
 		config.setString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, "1 second");
 		config.setString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, "100 ms");
 
-		cluster = new ForkableFlinkMiniCluster(config, false);
+		cluster = new LocalFlinkMiniCluster(config, false);
 
 		cluster.start();
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
index 6355a8f..f09efc5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
@@ -20,7 +20,7 @@ package org.apache.flink.test.recovery;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.BeforeClass;
 
 public class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecoveryITCaseBase {
@@ -33,8 +33,8 @@ public class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecover
 		config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
 		config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "100 ms");
 
-		cluster = new ForkableFlinkMiniCluster(config, false);
+		cluster = new LocalFlinkMiniCluster(config, false);
 
 		cluster.start();
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
index 004340c..bf7c524 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.AfterClass;
 import org.junit.Test;
 
@@ -42,7 +42,7 @@ import static org.junit.Assert.*;
 @SuppressWarnings("serial")
 public abstract class SimpleRecoveryITCaseBase {
 
-	protected static ForkableFlinkMiniCluster cluster;
+	protected static LocalFlinkMiniCluster cluster;
 
 	@AfterClass
 	public static void teardownCluster() {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index 6c621ac..5d29905 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.Test;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -65,7 +65,7 @@ public class TaskManagerFailureRecoveryITCase {
 
 		final int PARALLELISM = 4;
 
-		ForkableFlinkMiniCluster cluster = null;
+		LocalFlinkMiniCluster cluster = null;
 		ActorSystem additionalSystem = null;
 
 		try {
@@ -78,7 +78,7 @@ public class TaskManagerFailureRecoveryITCase {
 			config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "20 s");
 			config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 20);
 
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 
 			cluster.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index 7710f06..0b008eb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -26,8 +26,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.Collector;
@@ -63,7 +63,7 @@ public class IPv6HostnamesITCase extends TestLogger {
 
 		
 		
-		ForkableFlinkMiniCluster flink = null;
+		LocalFlinkMiniCluster flink = null;
 		try {
 			final String addressString = ipv6address.getHostAddress();
 			log.info("Test will use IPv6 address " + addressString + " for connection tests");
@@ -75,7 +75,7 @@ public class IPv6HostnamesITCase extends TestLogger {
 			conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
 			conf.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
 			
-			flink = new ForkableFlinkMiniCluster(conf, false);
+			flink = new LocalFlinkMiniCluster(conf, false);
 			flink.start();
 
 			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(addressString, flink.getLeaderRPCPort());


[4/4] flink git commit: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFlinkMiniCluster

Posted by tr...@apache.org.
[FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFlinkMiniCluster

Rename _configuration to originalConfiguration

Remove testing classes from main scope in flink-runtime

Previously, the ForkableFlinkMiniCluster which resided in flink-test-utils required
these files to be in the main scope of flink-runtime. With the removal of the
ForkableFlinkMiniCluster, these classes are now no longer needed and can be moved
back to the test scope.

This closes #2450.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02b852e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02b852e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02b852e3

Branch: refs/heads/master
Commit: 02b852e3571e46f25fdfc79f43ceb726ddff9ba7
Parents: 920cda4
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Aug 31 17:58:09 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Sep 8 17:17:28 2016 +0200

----------------------------------------------------------------------
 .../api/avro/AvroExternalJarProgramITCase.java  |   7 +-
 .../flink/contrib/streaming/CollectITCase.java  |   4 +-
 .../operations/DegreesWithExceptionITCase.java  |   6 +-
 .../ReduceOnEdgesWithExceptionITCase.java       |   6 +-
 .../ReduceOnNeighborsWithExceptionITCase.java   |   6 +-
 .../apache/flink/ml/util/FlinkTestBase.scala    |  11 +-
 .../clusterframework/FlinkResourceManager.java  |  13 +-
 .../testutils/TestingResourceManager.java       | 137 ------
 .../flink/runtime/jobmanager/JobManager.scala   |  45 +-
 .../runtime/messages/TaskManagerMessages.scala  |  26 ++
 .../runtime/minicluster/FlinkMiniCluster.scala  |  73 +++-
 .../minicluster/LocalFlinkMiniCluster.scala     | 235 ++++++++---
 .../flink/runtime/taskmanager/TaskManager.scala | 130 ++++--
 .../testingUtils/TestingJobManager.scala        |  72 ----
 .../testingUtils/TestingJobManagerLike.scala    | 417 -------------------
 .../TestingJobManagerMessages.scala             | 133 ------
 .../testingUtils/TestingMemoryArchivist.scala   |  43 --
 .../runtime/testingUtils/TestingMessages.scala  |  40 --
 .../testingUtils/TestingTaskManager.scala       |  70 ----
 .../testingUtils/TestingTaskManagerLike.scala   | 248 -----------
 .../TestingTaskManagerMessages.scala            |  94 -----
 .../LeaderElectionRetrievalTestingCluster.java  |   3 +-
 .../testutils/TestingResourceManager.java       | 137 ++++++
 .../runtime/testingUtils/TestingCluster.scala   | 322 ++++++++------
 .../testingUtils/TestingJobManager.scala        |  71 ++++
 .../testingUtils/TestingJobManagerLike.scala    | 417 +++++++++++++++++++
 .../TestingJobManagerMessages.scala             | 132 ++++++
 .../testingUtils/TestingMemoryArchivist.scala   |  43 ++
 .../runtime/testingUtils/TestingMessages.scala  |  40 ++
 .../testingUtils/TestingTaskManager.scala       |  70 ++++
 .../testingUtils/TestingTaskManagerLike.scala   | 234 +++++++++++
 .../TestingTaskManagerMessages.scala            |  82 ++++
 .../flink/api/scala/ScalaShellITCase.scala      |   7 +-
 .../cassandra/CassandraConnectorITCase.java     |   6 +-
 .../kafka/KafkaShortRetentionTestBase.java      |   6 +-
 .../connectors/kafka/KafkaTestBase.java         |   6 +-
 .../manualtests/ManualExactlyOnceTest.java      |   4 +-
 ...nualExactlyOnceWithStreamReshardingTest.java |   4 +-
 ...ScalaStreamingMultipleProgramsTestBase.scala |   5 +-
 .../flink-test-utils/pom.xml                    | 149 -------
 .../util/StreamingMultipleProgramsTestBase.java |   4 +-
 .../streaming/util/TestStreamEnvironment.java   |   8 +-
 .../flink/test/util/AbstractTestBase.java       |   3 +-
 .../test/util/MultipleProgramsTestBase.java     |   3 +-
 .../apache/flink/test/util/TestBaseUtils.java   |  31 +-
 .../apache/flink/test/util/TestEnvironment.java |   7 +-
 .../test/util/ForkableFlinkMiniCluster.scala    | 335 ---------------
 .../accumulators/AccumulatorErrorITCase.java    |   6 +-
 .../accumulators/AccumulatorLiveITCase.java     |   1 -
 .../test/cancelling/CancelingTestBase.java      |   7 +-
 .../EventTimeAllWindowCheckpointingITCase.java  |   6 +-
 .../EventTimeWindowCheckpointingITCase.java     |   6 +-
 .../test/checkpointing/RescalingITCase.java     |   6 +-
 .../test/checkpointing/SavepointITCase.java     |  19 +-
 .../StreamCheckpointNotifierITCase.java         |   6 +-
 .../StreamFaultToleranceTestBase.java           |   6 +-
 .../WindowCheckpointingITCase.java              |   6 +-
 .../test/classloading/ClassLoaderITCase.java    |   7 +-
 .../clients/examples/JobRetrievalITCase.java    |   5 +-
 .../JobSubmissionFailsITCase.java               |   6 +-
 .../CustomDistributionITCase.java               |   4 +-
 .../RemoteEnvironmentITCase.java                |   7 +-
 .../flink/test/misc/AutoParallelismITCase.java  |   6 +-
 .../test/misc/CustomSerializationITCase.java    |   6 +-
 .../test/misc/MiscellaneousIssuesITCase.java    |   6 +-
 ...SuccessAfterNetworkBuffersFailureITCase.java |   6 +-
 .../flink/test/query/QueryableStateITCase.java  |   6 +-
 .../flink/test/recovery/FastFailuresITCase.java |   4 +-
 ...SimpleRecoveryFailureRateStrategyITBase.java |   6 +-
 ...RecoveryFixedDelayRestartStrategyITBase.java |   6 +-
 .../test/recovery/SimpleRecoveryITCaseBase.java |   4 +-
 .../TaskManagerFailureRecoveryITCase.java       |   6 +-
 .../flink/test/runtime/IPv6HostnamesITCase.java |   6 +-
 .../ZooKeeperLeaderElectionITCase.java          |  56 +--
 .../test/streaming/runtime/TimestampITCase.java |   6 +-
 .../flink/test/web/WebFrontendITCase.java       |   6 +-
 .../jobmanager/JobManagerFailsITCase.scala      |   8 +-
 .../taskmanager/TaskManagerFailsITCase.scala    |  12 +-
 flink-yarn-tests/pom.xml                        |   8 +
 .../org/apache/flink/yarn/YarnTestBase.java     |   1 -
 tools/maven/scalastyle-config.xml               |   2 +-
 81 files changed, 2037 insertions(+), 2167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index 29a7e58..1030ff8 100644
--- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -25,12 +25,11 @@ import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-
 public class AvroExternalJarProgramITCase {
 
 	private static final String JAR_FILE = "maven-test-jar.jar";
@@ -40,12 +39,12 @@ public class AvroExternalJarProgramITCase {
 	@Test
 	public void testExternalProgram() {
 
-		ForkableFlinkMiniCluster testMiniCluster = null;
+		LocalFlinkMiniCluster testMiniCluster = null;
 
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
-			testMiniCluster = new ForkableFlinkMiniCluster(config, false);
+			testMiniCluster = new LocalFlinkMiniCluster(config, false);
 			testMiniCluster.start();
 
 			String jarFile = JAR_FILE;

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
index 10ea85c..d691621 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
@@ -19,9 +19,9 @@
 package org.apache.flink.contrib.streaming;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
 import org.junit.Test;
 
@@ -36,7 +36,7 @@ public class CollectITCase {
 
 	@Test
 	public void testCollect() throws Exception {
-		final ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(new Configuration(), false);
+		final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new Configuration(), false);
 		try {
 			cluster.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
index 551a97b..02eea07 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
 import org.apache.flink.types.LongValue;
 import org.junit.AfterClass;
@@ -39,7 +39,7 @@ public class DegreesWithExceptionITCase {
 
 	private static final int PARALLELISM = 4;
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 	
 
 	@BeforeClass
@@ -47,7 +47,7 @@ public class DegreesWithExceptionITCase {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 			cluster.start();
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
index 56a0a59..666f7ef 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
@@ -30,7 +30,7 @@ import org.apache.flink.graph.EdgesFunctionWithVertexValue;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -42,7 +42,7 @@ public class ReduceOnEdgesWithExceptionITCase {
 
 	private static final int PARALLELISM = 4;
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 
 	@BeforeClass
@@ -50,7 +50,7 @@ public class ReduceOnEdgesWithExceptionITCase {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 			cluster.start();
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
index 7458e08..0bbdc84 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
@@ -31,7 +31,7 @@ import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
 import org.apache.flink.graph.ReduceNeighborsFunction;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -43,7 +43,7 @@ public class ReduceOnNeighborsWithExceptionITCase {
 
 	private static final int PARALLELISM = 4;
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 
 	@BeforeClass
@@ -51,7 +51,7 @@ public class ReduceOnNeighborsWithExceptionITCase {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 			cluster.start();
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
index fb98f24..6353d6a 100644
--- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
+++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
@@ -18,14 +18,15 @@
 
 package org.apache.flink.ml.util
 
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils, TestEnvironment}
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+import org.apache.flink.test.util.{TestBaseUtils, TestEnvironment}
 import org.scalatest.{BeforeAndAfter, Suite}
 
-/** Mixin to start and stop a ForkableFlinkMiniCluster automatically for Scala based tests.
+/** Mixin to start and stop a LocalFlinkMiniCluster automatically for Scala based tests.
   * Additionally a TestEnvironment with the started cluster is created and set as the default
   * [[org.apache.flink.api.java.ExecutionEnvironment]].
   *
-  * This mixin starts a ForkableFlinkMiniCluster with one TaskManager and a number of slots given
+  * This mixin starts a LocalFlinkMiniCluster with one TaskManager and a number of slots given
   * by parallelism. This value can be overridden in a sub class in order to start the cluster
   * with a different number of slots.
   *
@@ -37,7 +38,7 @@ import org.scalatest.{BeforeAndAfter, Suite}
   * @example
   *          {{{
   *            def testSomething: Unit = {
-  *             // Obtain TestEnvironment with started ForkableFlinkMiniCluster
+  *             // Obtain TestEnvironment with started LocalFlinkMiniCluster
   *             val env = ExecutionEnvironment.getExecutionEnvironment
   *
   *             env.fromCollection(...)
@@ -50,7 +51,7 @@ import org.scalatest.{BeforeAndAfter, Suite}
 trait FlinkTestBase extends BeforeAndAfter {
   that: Suite =>
 
-  var cluster: Option[ForkableFlinkMiniCluster] = None
+  var cluster: Option[LocalFlinkMiniCluster] = None
   val parallelism = 4
 
   before {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
index 95be084..7ea286d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
@@ -767,8 +767,19 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
 			Class<? extends FlinkResourceManager<?>> resourceManagerClass,
 			String resourceManagerActorName) {
 
-		Props resourceMasterProps = Props.create(resourceManagerClass, configuration, leaderRetriever);
+		Props resourceMasterProps = getResourceManagerProps(
+			resourceManagerClass,
+			configuration,
+			leaderRetriever);
 
 		return actorSystem.actorOf(resourceMasterProps, resourceManagerActorName);
 	}
+
+	public static Props getResourceManagerProps(
+		Class<? extends FlinkResourceManager> resourceManagerClass,
+		Configuration configuration,
+		LeaderRetrievalService leaderRetrievalService) {
+
+		return Props.create(resourceManagerClass, configuration, leaderRetrievalService);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
deleted file mode 100644
index 495cacd..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testutils;
-
-import akka.actor.ActorRef;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
-import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.testingUtils.TestingMessages;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-
-/**
- * A testing resource manager which may alter the default standalone resource master's behavior.
- */
-public class TestingResourceManager extends StandaloneResourceManager {
-
-	/** Set of Actors which want to be informed of a connection to the job manager */
-	private Set<ActorRef> waitForResourceManagerConnected = new HashSet<>();
-
-	/** Set of Actors which want to be informed of a shutdown */
-	private Set<ActorRef> waitForShutdown = new HashSet<>();
-
-	/** Flag to signal a connection to the JobManager */
-	private boolean isConnected = false;
-
-	public TestingResourceManager(Configuration flinkConfig, LeaderRetrievalService leaderRetriever) {
-		super(flinkConfig, leaderRetriever);
-	}
-
-	/**
-	 * Overwrite messages here if desired
-	 */
-	@Override
-	protected void handleMessage(Object message) {
-
-		if (message instanceof GetRegisteredResources) {
-			sender().tell(new GetRegisteredResourcesReply(getStartedTaskManagers()), self());
-		} else if (message instanceof FailResource) {
-			ResourceID resourceID = ((FailResource) message).resourceID;
-			notifyWorkerFailed(resourceID, "Failed for test case.");
-
-		} else if (message instanceof NotifyWhenResourceManagerConnected) {
-			if (isConnected) {
-				sender().tell(
-					Messages.getAcknowledge(),
-					self());
-			} else {
-				waitForResourceManagerConnected.add(sender());
-			}
-		} else if (message instanceof RegisterResourceManagerSuccessful) {
-			super.handleMessage(message);
-
-			isConnected = true;
-
-			for (ActorRef ref : waitForResourceManagerConnected) {
-				ref.tell(
-					Messages.getAcknowledge(),
-					self());
-			}
-			waitForResourceManagerConnected.clear();
-
-		} else if (message instanceof TestingMessages.NotifyOfComponentShutdown$) {
-			waitForShutdown.add(sender());
-		} else if (message instanceof TestingMessages.Alive$) {
-			sender().tell(Messages.getAcknowledge(), self());
-		} else {
-			super.handleMessage(message);
-		}
-	}
-
-	/**
-	 * Testing messages
-	 */
-	public static class GetRegisteredResources {}
-
-	public static class GetRegisteredResourcesReply {
-
-		public Collection<ResourceID> resources;
-
-		public GetRegisteredResourcesReply(Collection<ResourceID> resources) {
-			this.resources = resources;
-		}
-
-	}
-
-	/**
-	 * Fails all resources that the resource manager has registered
-	 */
-	public static class FailResource {
-
-		public ResourceID resourceID;
-
-		public FailResource(ResourceID resourceID) {
-			this.resourceID = resourceID;
-		}
-	}
-
-	/**
-	 * The sender of this message will be informed of a connection to the Job Manager
-	 */
-	public static class NotifyWhenResourceManagerConnected {}
-
-	/**
-	 * Inform registered listeners about a shutdown of the application.
-     */
-	@Override
-	protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
-		for (ActorRef listener : waitForShutdown) {
-			listener.tell(new TestingMessages.ComponentShutdown(self()), self());
-		}
-		waitForShutdown.clear();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 88af604..f67be0e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -49,7 +49,7 @@ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceMa
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.executiongraph.{StatusListenerMessenger, ExecutionGraph, ExecutionJobVertex}
+import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex, StatusListenerMessenger}
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
@@ -2721,7 +2721,7 @@ object JobManager {
       configuration,
       None)
 
-    val archiveProps = Props(archiveClass, archiveCount)
+    val archiveProps = getArchiveProps(archiveClass, archiveCount)
 
     // start the archiver with the given name, or without (avoid name conflicts)
     val archive: ActorRef = archiveActorName match {
@@ -2729,7 +2729,7 @@ object JobManager {
       case None => actorSystem.actorOf(archiveProps)
     }
 
-    val jobManagerProps = Props(
+    val jobManagerProps = getJobManagerProps(
       jobManagerClass,
       configuration,
       executorService,
@@ -2754,6 +2754,45 @@ object JobManager {
     (jobManager, archive)
   }
 
+  def getArchiveProps(archiveClass: Class[_ <: MemoryArchivist], archiveCount: Int): Props = {
+    Props(archiveClass, archiveCount)
+  }
+
+  def getJobManagerProps(
+    jobManagerClass: Class[_ <: JobManager],
+    configuration: Configuration,
+    executorService: ExecutorService,
+    instanceManager: InstanceManager,
+    scheduler: FlinkScheduler,
+    libraryCacheManager: BlobLibraryCacheManager,
+    archive: ActorRef,
+    restartStrategyFactory: RestartStrategyFactory,
+    timeout: FiniteDuration,
+    leaderElectionService: LeaderElectionService,
+    submittedJobGraphStore: SubmittedJobGraphStore,
+    checkpointRecoveryFactory: CheckpointRecoveryFactory,
+    savepointStore: SavepointStore,
+    jobRecoveryTimeout: FiniteDuration,
+    metricsRegistry: Option[FlinkMetricRegistry]): Props = {
+
+    Props(
+      jobManagerClass,
+      configuration,
+      executorService,
+      instanceManager,
+      scheduler,
+      libraryCacheManager,
+      archive,
+      restartStrategyFactory,
+      timeout,
+      leaderElectionService,
+      submittedJobGraphStore,
+      checkpointRecoveryFactory,
+      savepointStore,
+      jobRecoveryTimeout,
+      metricsRegistry)
+  }
+
   // --------------------------------------------------------------------------
   //  Resolving the JobManager endpoint
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index 2d99245..b433015 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -130,6 +130,16 @@ object TaskManagerMessages {
     */
   case class RequestTaskManagerLog(requestType : LogTypeRequest)
 
+  /** Requests the number of active connections at the ConnectionManager */
+  case object RequestNumActiveConnections
+
+  case class ResponseNumActiveConnections(number: Int)
+
+  /** Requests the number of broadcast variables with references */
+  case object RequestBroadcastVariablesWithReferences
+
+  case class ResponseBroadcastVariablesWithReferences(number: Int)
+
 
   // --------------------------------------------------------------------------
   //  Utility getters for case objects to simplify access from Java
@@ -166,4 +176,20 @@ object TaskManagerMessages {
   def getRequestTaskManagerStdout(): AnyRef = {
     RequestTaskManagerLog(StdOutFileRequest)
   }
+
+  /**
+    * Accessor for the case object instance, to simplify Java interoperability.
+    * @return The RequestBroadcastVariablesWithReferences case object instance.
+    */
+  def getRequestBroadcastVariablesWithReferences(): RequestBroadcastVariablesWithReferences.type = {
+    RequestBroadcastVariablesWithReferences
+  }
+
+  /**
+    * Accessor for the case object instance, to simplify Java interoperability.
+    * @return The RequestNumActiveConnections case object instance.
+    */
+  def getRequestNumActiveConnections(): RequestNumActiveConnections.type  = {
+    RequestNumActiveConnections
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index a547d25..0178bd3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -69,7 +69,7 @@ abstract class FlinkMiniCluster(
     ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
     InetAddress.getByName("localhost").getHostAddress())
 
-  val configuration = generateConfiguration(userConfiguration)
+  protected val originalConfiguration = generateConfiguration(userConfiguration)
 
   /** Future to the [[ActorGateway]] of the current leader */
   var leaderGateway: Promise[ActorGateway] = Promise()
@@ -79,16 +79,16 @@ abstract class FlinkMiniCluster(
 
   /** Future lock */
   val futureLock = new Object()
-  
+
   implicit val executionContext = ExecutionContext.global
 
-  implicit val timeout = AkkaUtils.getTimeout(configuration)
+  implicit val timeout = AkkaUtils.getTimeout(originalConfiguration)
 
-  val haMode = HighAvailabilityMode.fromConfig(configuration)
+  val haMode = HighAvailabilityMode.fromConfig(originalConfiguration)
 
   val numJobManagers = getNumberOfJobManagers
 
-  var numTaskManagers = configuration.getInteger(
+  var numTaskManagers = originalConfiguration.getInteger(
     ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
     ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)
 
@@ -105,6 +105,22 @@ abstract class FlinkMiniCluster(
 
   private var isRunning = false
 
+  def configuration: Configuration = {
+    if (originalConfiguration.getInteger(
+      ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) == 0) {
+      val leaderConfiguration = new Configuration(originalConfiguration)
+
+      val leaderPort = getLeaderRPCPort
+
+      leaderConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, leaderPort)
+
+      leaderConfiguration
+    } else {
+      originalConfiguration
+    }
+  }
+
   // --------------------------------------------------------------------------
   //                           Abstract Methods
   // --------------------------------------------------------------------------
@@ -125,7 +141,7 @@ abstract class FlinkMiniCluster(
     if(haMode == HighAvailabilityMode.NONE) {
       1
     } else {
-      configuration.getInteger(
+      originalConfiguration.getInteger(
         ConfigConstants.LOCAL_NUMBER_JOB_MANAGER,
         ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER
       )
@@ -136,7 +152,7 @@ abstract class FlinkMiniCluster(
     if(haMode == HighAvailabilityMode.NONE) {
       1
     } else {
-      configuration.getInteger(
+      originalConfiguration.getInteger(
         ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER,
         ConfigConstants.DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER
       )
@@ -177,40 +193,55 @@ abstract class FlinkMiniCluster(
     Await.result(indexFuture, timeout)
   }
 
+  def getLeaderRPCPort: Int = {
+    val index = getLeaderIndex(timeout)
+
+    jobManagerActorSystems match {
+      case Some(jmActorSystems) =>
+        AkkaUtils.getAddress(jmActorSystems(index)).port match {
+          case Some(p) => p
+          case None => -1
+        }
+
+      case None => throw new Exception("The JobManager of the LocalFlinkMiniCluster has not been " +
+                                         "started properly.")
+    }
+  }
+
   def getResourceManagerAkkaConfig(index: Int): Config = {
     if (useSingleActorSystem) {
-      AkkaUtils.getAkkaConfig(configuration, None)
+      AkkaUtils.getAkkaConfig(originalConfiguration, None)
     } else {
-      val port = configuration.getInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
-        ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
+      val port = originalConfiguration.getInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
+                                                  ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
 
       val resolvedPort = if(port != 0) port + index else port
 
-      AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort)))
+      AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort)))
     }
   }
 
   def getJobManagerAkkaConfig(index: Int): Config = {
     if (useSingleActorSystem) {
-      AkkaUtils.getAkkaConfig(configuration, None)
+      AkkaUtils.getAkkaConfig(originalConfiguration, None)
     }
     else {
-      val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-        ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+      val port = originalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+                                                  ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 
       val resolvedPort = if(port != 0) port + index else port
 
-      AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort)))
+      AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort)))
     }
   }
 
   def getTaskManagerAkkaConfig(index: Int): Config = {
-    val port = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
+    val port = originalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
+                                                ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
 
     val resolvedPort = if(port != 0) port + index else port
 
-    AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort)))
+    AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort)))
   }
 
   /**
@@ -257,7 +288,7 @@ abstract class FlinkMiniCluster(
           "The FlinkMiniCluster has not been started yet.")
       }
     } else {
-      JobClient.startJobClientActorSystem(configuration)
+      JobClient.startJobClientActorSystem(originalConfiguration)
     }
   }
 
@@ -320,7 +351,7 @@ abstract class FlinkMiniCluster(
 
     val jobManagerAkkaURL = AkkaUtils.getAkkaURL(jmActorSystems(0), jmActors(0))
 
-    webMonitor = startWebServer(configuration, jmActorSystems(0), jobManagerAkkaURL)
+    webMonitor = startWebServer(originalConfiguration, jmActorSystems(0), jobManagerAkkaURL)
 
     if(waitForTaskManagerRegistration) {
       waitForTaskManagersToBeRegistered()
@@ -528,7 +559,7 @@ abstract class FlinkMiniCluster(
           new StandaloneLeaderRetrievalService(
             AkkaUtils.getAkkaURL(jmActorSystems(0), jmActors(0)))
         } else {
-          ZooKeeperUtils.createLeaderRetrievalService(configuration)
+          ZooKeeperUtils.createLeaderRetrievalService(originalConfiguration)
         }
 
       case _ => throw new Exception("The FlinkMiniCluster has not been started properly.")

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index d30c047..cac5d91 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -18,23 +18,36 @@
 
 package org.apache.flink.runtime.minicluster
 
-import akka.actor.{ActorRef, ActorSystem}
-import org.apache.flink.api.common.JobID
+import java.util.concurrent.ExecutorService
 
+import akka.actor.{ActorRef, ActorSystem, Props}
+import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.io.FileOutputFormat
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
-import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.clusterframework.types.{ResourceID, ResourceIDRetrievable}
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.io.disk.iomanager.IOManager
+import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.io.network.netty.NettyConfig
-import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
+import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, SubmittedJobGraphStore}
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
+import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.messages.JobManagerMessages
-import org.apache.flink.runtime.messages.JobManagerMessages.{CancellationFailure, CancellationResponse, StoppingFailure, StoppingResponse, RunningJobsStatus, RunningJobs}
-import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, StoppingFailure, StoppingResponse}
+import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation}
 import org.apache.flink.runtime.util.EnvironmentInformation
 
 import scala.concurrent.Await
+import scala.concurrent.duration.FiniteDuration
 
 /**
  * Local Flink mini cluster which executes all [[TaskManager]]s and the [[JobManager]] in the same
@@ -65,8 +78,25 @@ class LocalFlinkMiniCluster(
     config
   }
 
+  //------------------------------------------------------------------------------------------------
+  // Actor classes
+  //------------------------------------------------------------------------------------------------
+
+  val jobManagerClass: Class[_ <: JobManager] = classOf[JobManager]
+
+  val taskManagerClass: Class[_ <: TaskManager] = classOf[TaskManager]
+
+  val memoryArchivistClass: Class[_ <: MemoryArchivist] = classOf[MemoryArchivist]
+
+  val resourceManagerClass: Class[_ <: FlinkResourceManager[_ <: ResourceIDRetrievable]] =
+    classOf[StandaloneResourceManager]
+
+  //------------------------------------------------------------------------------------------------
+  // Start methods for the distributed components
+  //------------------------------------------------------------------------------------------------
+
   override def startJobManager(index: Int, system: ActorSystem): ActorRef = {
-    val config = configuration.clone()
+    val config = originalConfiguration.clone()
 
     val jobManagerName = getJobManagerName(index)
     val archiveName = getArchiveName(index)
@@ -79,19 +109,48 @@ class LocalFlinkMiniCluster(
       config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
     }
 
-    val (jobManager, _) = JobManager.startJobManagerActors(
-      config,
-      system,
-      Some(jobManagerName),
-      Some(archiveName),
-      classOf[JobManager],
-      classOf[MemoryArchivist])
-
-    jobManager
+    val (executorService,
+    instanceManager,
+    scheduler,
+    libraryCacheManager,
+    restartStrategyFactory,
+    timeout,
+    archiveCount,
+    leaderElectionService,
+    submittedJobGraphStore,
+    checkpointRecoveryFactory,
+    savepointStore,
+    jobRecoveryTimeout,
+    metricsRegistry) = JobManager.createJobManagerComponents(config, createLeaderElectionService())
+
+    val archive = system.actorOf(
+      getArchiveProps(
+        memoryArchivistClass,
+        archiveCount),
+      archiveName)
+
+    system.actorOf(
+      getJobManagerProps(
+        jobManagerClass,
+        config,
+        executorService,
+        instanceManager,
+        scheduler,
+        libraryCacheManager,
+        archive,
+        restartStrategyFactory,
+        timeout,
+        leaderElectionService,
+        submittedJobGraphStore,
+        checkpointRecoveryFactory,
+        savepointStore,
+        jobRecoveryTimeout,
+        metricsRegistry),
+      jobManagerName)
   }
 
   override def startResourceManager(index: Int, system: ActorSystem): ActorRef = {
-    val config = configuration.clone()
+    val config = originalConfiguration.clone()
 
     val resourceManagerName = getResourceManagerName(index)
 
@@ -103,18 +162,16 @@ class LocalFlinkMiniCluster(
       config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index)
     }
 
-    val resourceManager = FlinkResourceManager.startResourceManagerActors(
+    val resourceManagerProps = getResourceManagerProps(
+      resourceManagerClass,
       config,
-      system,
-      createLeaderRetrievalService(),
-      classOf[StandaloneResourceManager],
-      resourceManagerName)
+      createLeaderRetrievalService())
 
-    resourceManager
+    system.actorOf(resourceManagerProps, resourceManagerName)
   }
 
   override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
-    val config = configuration.clone()
+    val config = originalConfiguration.clone()
 
     val rpcPort = config.getInteger(
       ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
@@ -138,32 +195,115 @@ class LocalFlinkMiniCluster(
     } else {
       TaskManager.TASK_MANAGER_NAME
     }
-    
-    TaskManager.startTaskManagerComponentsAndActor(
+
+    val resourceID = ResourceID.generate() // generate random resource id
+
+    val (taskManagerConfig,
+    taskManagerLocation,
+    memoryManager,
+    ioManager,
+    network,
+    leaderRetrievalService) = TaskManager.createTaskManagerComponents(
       config,
-      ResourceID.generate(), // generate random resource id
-      system,
+      resourceID,
       hostname, // network interface to bind to
-      Some(taskManagerActorName), // actor name
-      Some(createLeaderRetrievalService()), // job manager leader retrieval service
       localExecution, // start network stack?
-      classOf[TaskManager])
+      Some(createLeaderRetrievalService()))
+
+    val props = getTaskManagerProps(
+      taskManagerClass,
+      taskManagerConfig,
+      resourceID,
+      taskManagerLocation,
+      memoryManager,
+      ioManager,
+      network,
+      leaderRetrievalService)
+
+    system.actorOf(props, taskManagerActorName)
   }
 
-  def getLeaderRPCPort: Int = {
-    val index = getLeaderIndex(timeout)
+  //------------------------------------------------------------------------------------------------
+  // Props for the distributed components
+  //------------------------------------------------------------------------------------------------
 
-    jobManagerActorSystems match {
-      case Some(jmActorSystems) =>
-        AkkaUtils.getAddress(jmActorSystems(index)).port match {
-          case Some(p) => p
-          case None => -1
-        }
+  def getArchiveProps(archiveClass: Class[_ <: MemoryArchivist], archiveCount: Int): Props = {
+    JobManager.getArchiveProps(archiveClass, archiveCount)
+  }
 
-      case None => throw new Exception("The JobManager of the LocalFlinkMiniCluster has not been " +
-        "started properly.")
-    }
+  def getJobManagerProps(
+    jobManagerClass: Class[_ <: JobManager],
+    configuration: Configuration,
+    executorService: ExecutorService,
+    instanceManager: InstanceManager,
+    scheduler: Scheduler,
+    libraryCacheManager: BlobLibraryCacheManager,
+    archive: ActorRef,
+    restartStrategyFactory: RestartStrategyFactory,
+    timeout: FiniteDuration,
+    leaderElectionService: LeaderElectionService,
+    submittedJobGraphStore: SubmittedJobGraphStore,
+    checkpointRecoveryFactory: CheckpointRecoveryFactory,
+    savepointStore: SavepointStore,
+    jobRecoveryTimeout: FiniteDuration,
+    metricsRegistry: Option[MetricRegistry]): Props = {
+
+    JobManager.getJobManagerProps(
+      jobManagerClass,
+      configuration,
+      executorService,
+      instanceManager,
+      scheduler,
+      libraryCacheManager,
+      archive,
+      restartStrategyFactory,
+      timeout,
+      leaderElectionService,
+      submittedJobGraphStore,
+      checkpointRecoveryFactory,
+      savepointStore,
+      jobRecoveryTimeout,
+      metricsRegistry)
+  }
+
+  def getTaskManagerProps(
+    taskManagerClass: Class[_ <: TaskManager],
+    taskManagerConfig: TaskManagerConfiguration,
+    resourceID: ResourceID,
+    taskManagerLocation: TaskManagerLocation,
+    memoryManager: MemoryManager,
+    ioManager: IOManager,
+    networkEnvironment: NetworkEnvironment,
+    leaderRetrievalService: LeaderRetrievalService): Props = {
+
+    TaskManager.getTaskManagerProps(
+      taskManagerClass,
+      taskManagerConfig,
+      resourceID,
+      taskManagerLocation,
+      memoryManager,
+      ioManager,
+      networkEnvironment,
+      leaderRetrievalService)
+  }
+
+  def getResourceManagerProps(
+    resourceManagerClass: Class[_ <: FlinkResourceManager[_ <: ResourceIDRetrievable]],
+    configuration: Configuration,
+    leaderRetrievalService: LeaderRetrievalService): Props = {
+
+    FlinkResourceManager.getResourceManagerProps(
+      resourceManagerClass,
+      configuration,
+      leaderRetrievalService)
+  }
+
+  //------------------------------------------------------------------------------------------------
+  // Helper methods
+  //------------------------------------------------------------------------------------------------
 
+  def createLeaderElectionService(): Option[LeaderElectionService] = {
+    None
   }
 
   def initializeIOFormatClasses(configuration: Configuration): Unit = {
@@ -186,7 +326,7 @@ class LocalFlinkMiniCluster(
       val bufferSize: Int = config.getInteger(
         ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
         ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)
-      
+
       val bufferMem: Long = config.getLong(
         ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
         ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) * bufferSize.toLong
@@ -218,6 +358,7 @@ class LocalFlinkMiniCluster(
     val config: Configuration = new Configuration()
 
     config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, hostname)
+    config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
 
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
       ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)
@@ -252,11 +393,11 @@ class LocalFlinkMiniCluster(
       JobManager.ARCHIVE_NAME
     }
   }
-  
+
   // --------------------------------------------------------------------------
   //  Actions on running jobs
   // --------------------------------------------------------------------------
-  
+
   def currentlyRunningJobs: Iterable[JobID] = {
     val leader = getLeaderGateway(timeout)
     val future = leader.ask(JobManagerMessages.RequestRunningJobsStatus, timeout)
@@ -269,7 +410,7 @@ class LocalFlinkMiniCluster(
     currentlyRunningJobs.foreach(list.add)
     list
   }
-  
+
   def stopJob(id: JobID) : Unit = {
     val leader = getLeaderGateway(timeout)
     val response = leader.ask(new JobManagerMessages.StopJob(id), timeout)

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 84750a3..de85f30 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -354,6 +354,21 @@ class TaskManager(
         case None =>
           sender() ! new IOException("BlobService not available. Cannot upload TaskManager logs.")
       }
+
+    case RequestBroadcastVariablesWithReferences =>
+      sender ! decorateMessage(
+        ResponseBroadcastVariablesWithReferences(
+          bcVarManager.getNumberOfVariablesWithReferences)
+      )
+
+    case RequestNumActiveConnections =>
+      val numActive = if (!network.isShutdown) {
+        network.getConnectionManager.getNumberOfActiveConnections
+      } else {
+        0
+      }
+
+      sender ! decorateMessage(ResponseNumActiveConnections(numActive))
   }
 
   /**
@@ -1781,6 +1796,7 @@ object TaskManager {
   }
 
   /**
+   * Starts the task manager actor.
    *
    * @param configuration The configuration for the TaskManager.
    * @param resourceID The id of the resource which the task manager will run on.
@@ -1817,11 +1833,75 @@ object TaskManager {
       taskManagerClass: Class[_ <: TaskManager])
     : ActorRef = {
 
-    val (taskManagerConfig : TaskManagerConfiguration,      
-      netConfig: NetworkEnvironmentConfiguration,
-      taskManagerAddress: InetSocketAddress,
-      memType: MemoryType
-    ) = parseTaskManagerConfiguration(
+    val (taskManagerConfig,
+      connectionInfo,
+      memoryManager,
+      ioManager,
+      network,
+      leaderRetrievalService) = createTaskManagerComponents(
+      configuration,
+      resourceID,
+      taskManagerHostname,
+      localTaskManagerCommunication,
+      leaderRetrievalServiceOption)
+
+    // create the actor properties (which define the actor constructor parameters)
+    val tmProps = getTaskManagerProps(
+      taskManagerClass,
+      taskManagerConfig,
+      resourceID,
+      connectionInfo,
+      memoryManager,
+      ioManager,
+      network,
+      leaderRetrievalService)
+
+    taskManagerActorName match {
+      case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
+      case None => actorSystem.actorOf(tmProps)
+    }
+  }
+
+  def getTaskManagerProps(
+    taskManagerClass: Class[_ <: TaskManager],
+    taskManagerConfig: TaskManagerConfiguration,
+    resourceID: ResourceID,
+    taskManagerLocation: TaskManagerLocation,
+    memoryManager: MemoryManager,
+    ioManager: IOManager,
+    networkEnvironment: NetworkEnvironment,
+    leaderRetrievalService: LeaderRetrievalService
+  ): Props = {
+    Props(
+      taskManagerClass,
+      taskManagerConfig,
+      resourceID,
+      taskManagerLocation,
+      memoryManager,
+      ioManager,
+      networkEnvironment,
+      taskManagerConfig.numberOfSlots,
+      leaderRetrievalService)
+  }
+
+  def createTaskManagerComponents(
+    configuration: Configuration,
+    resourceID: ResourceID,
+    taskManagerHostname: String,
+    localTaskManagerCommunication: Boolean,
+    leaderRetrievalServiceOption: Option[LeaderRetrievalService]):
+      (TaskManagerConfiguration,
+      TaskManagerLocation,
+      MemoryManager,
+      IOManager,
+      NetworkEnvironment,
+      LeaderRetrievalService) = {
+
+    val (taskManagerConfig : TaskManagerConfiguration,
+    netConfig: NetworkEnvironmentConfiguration,
+    taskManagerAddress: InetSocketAddress,
+    memType: MemoryType
+      ) = parseTaskManagerConfiguration(
       configuration,
       taskManagerHostname,
       localTaskManagerCommunication)
@@ -1895,10 +1975,10 @@ object TaskManager {
     // check if a value has been configured
     val configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)
     checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
-      ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
-      "MemoryManager needs at least one MB of memory. " +
-        "If you leave this config parameter empty, the system automatically " +
-        "pick a fraction of the available memory.")
+                         ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
+                         "MemoryManager needs at least one MB of memory. " +
+                           "If you leave this config parameter empty, the system automatically " +
+                           "pick a fraction of the available memory.")
 
 
     val preAllocateMemory = configuration.getBoolean(
@@ -1910,7 +1990,7 @@ object TaskManager {
         LOG.info(s"Using $configuredMemory MB for managed memory.")
       } else {
         LOG.info(s"Limiting managed memory to $configuredMemory MB, " +
-          s"memory will be allocated lazily.")
+                   s"memory will be allocated lazily.")
       }
       configuredMemory << 20 // megabytes to bytes
     }
@@ -1928,10 +2008,10 @@ object TaskManager {
 
         if (preAllocateMemory) {
           LOG.info(s"Using $fraction of the currently free heap space for managed " +
-            s"heap memory (${relativeMemSize >> 20} MB).")
+                     s"heap memory (${relativeMemSize >> 20} MB).")
         } else {
           LOG.info(s"Limiting managed memory to $fraction of the currently free heap space " +
-            s"(${relativeMemSize >> 20} MB), memory will be allocated lazily.")
+                     s"(${relativeMemSize >> 20} MB), memory will be allocated lazily.")
         }
 
         relativeMemSize
@@ -1944,10 +2024,10 @@ object TaskManager {
 
         if (preAllocateMemory) {
           LOG.info(s"Using $fraction of the maximum memory size for " +
-            s"managed off-heap memory (${directMemorySize >> 20} MB).")
+                     s"managed off-heap memory (${directMemorySize >> 20} MB).")
         } else {
           LOG.info(s"Limiting managed memory to $fraction of the maximum memory size " +
-            s"(${directMemorySize >> 20} MB), memory will be allocated lazily.")
+                     s"(${directMemorySize >> 20} MB), memory will be allocated lazily.")
         }
 
         directMemorySize
@@ -1971,12 +2051,12 @@ object TaskManager {
         memType match {
           case MemoryType.HEAP =>
             throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
-              s" while allocating the TaskManager heap memory ($memorySize bytes).", e)
+                      s" while allocating the TaskManager heap memory ($memorySize bytes).", e)
 
           case MemoryType.OFF_HEAP =>
             throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
-              s" while allocating the TaskManager off-heap memory ($memorySize bytes). " +
-              s"Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e)
+                      s" while allocating the TaskManager off-heap memory ($memorySize bytes). " +
+                      s"Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e)
 
           case _ => throw e
         }
@@ -1990,22 +2070,12 @@ object TaskManager {
       case None => LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
     }
 
-    // create the actor properties (which define the actor constructor parameters)
-    val tmProps = Props(
-      taskManagerClass,
-      taskManagerConfig,
-      resourceID,
+    (taskManagerConfig,
       taskManagerLocation,
       memoryManager,
       ioManager,
       network,
-      taskManagerConfig.numberOfSlots,
       leaderRetrievalService)
-
-    taskManagerActorName match {
-      case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
-      case None => actorSystem.actorOf(tmProps)
-    }
   }
 
 
@@ -2055,8 +2125,8 @@ object TaskManager {
    * @param taskManagerHostname The host name under which the TaskManager communicates.
    * @param localTaskManagerCommunication True, to skip initializing the network stack.
    *                                      Use only in cases where only one task manager runs.
-   * @return A tuple (TaskManagerConfiguration, network configuration,
-   *                  InstanceConnectionInfo, JobManager actor Akka URL).
+   * @return A tuple (TaskManagerConfiguration, network configuration, inet socket address,
+    *         memory tyep).
    */
   @throws(classOf[IllegalArgumentException])
   def parseTaskManagerConfiguration(

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
deleted file mode 100644
index 16331ac..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.ActorRef
-
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
-import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
-import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.metrics.MetricRegistry
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import java.util.concurrent.ExecutorService
-
-/** JobManager implementation extended by testing messages
-  *
-  */
-class TestingJobManager(
-    flinkConfiguration: Configuration,
-    executorService: ExecutorService,
-    instanceManager: InstanceManager,
-    scheduler: Scheduler,
-    libraryCacheManager: BlobLibraryCacheManager,
-    archive: ActorRef,
-    restartStrategyFactory: RestartStrategyFactory,
-    timeout: FiniteDuration,
-    leaderElectionService: LeaderElectionService,
-    submittedJobGraphs : SubmittedJobGraphStore,
-    checkpointRecoveryFactory : CheckpointRecoveryFactory,
-    savepointStore : SavepointStore,
-    jobRecoveryTimeout : FiniteDuration,
-    metricRegistry : Option[MetricRegistry])
-  extends JobManager(
-    flinkConfiguration,
-      executorService,
-    instanceManager,
-    scheduler,
-    libraryCacheManager,
-    archive,
-    restartStrategyFactory,
-    timeout,
-    leaderElectionService,
-    submittedJobGraphs,
-    checkpointRecoveryFactory,
-    savepointStore,
-    jobRecoveryTimeout,
-    metricRegistry)
-  with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
deleted file mode 100644
index 3947b17..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ /dev/null
@@ -1,417 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.{ActorRef, Cancellable, Terminated}
-import akka.pattern.{ask, pipe}
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.FlinkActor
-import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.jobmanager.JobManager
-import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient, RequestClassloadingProps}
-import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
-import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
-import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
-import org.apache.flink.runtime.testingUtils.TestingMessages._
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
-
-import scala.collection.mutable
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-/** This mixin can be used to decorate a JobManager with messages for testing purpose.  */
-trait TestingJobManagerLike extends FlinkActor {
-  that: JobManager =>
-
-  import context._
-
-  import scala.collection.JavaConverters._
-
-  val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
-  val waitForTaskManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
-
-  val waitForAllVerticesToBeRunningOrFinished =
-    scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
-
-  var periodicCheck: Option[Cancellable] = None
-
-  val waitForJobStatus = scala.collection.mutable.HashMap[JobID,
-    collection.mutable.HashMap[JobStatus, Set[ActorRef]]]()
-
-  val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, (Boolean, Set[ActorRef])]()
-
-  val waitForLeader = scala.collection.mutable.HashSet[ActorRef]()
-
-  val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder(
-    new Ordering[(Int, ActorRef)] {
-      override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1
-    })
-
-  val waitForClient = scala.collection.mutable.HashSet[ActorRef]()
-
-  val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
-
-  var disconnectDisabled = false
-
-  var postStopEnabled = true
-
-  abstract override def postStop(): Unit = {
-    if (postStopEnabled) {
-      super.postStop()
-    } else {
-      // only stop leader election service to revoke the leadership of this JM so that a new JM
-      // can be elected leader
-      leaderElectionService.stop()
-    }
-  }
-
-  abstract override def handleMessage: Receive = {
-    handleTestingMessage orElse super.handleMessage
-  }
-
-  def handleTestingMessage: Receive = {
-    case Alive => sender() ! Acknowledge
-
-    case RequestExecutionGraph(jobID) =>
-      currentJobs.get(jobID) match {
-        case Some((executionGraph, jobInfo)) => sender() ! decorateMessage(
-          ExecutionGraphFound(
-            jobID,
-            executionGraph)
-        )
-
-        case None => archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender())
-      }
-
-    case WaitForAllVerticesToBeRunning(jobID) =>
-      if(checkIfAllVerticesRunning(jobID)){
-        sender() ! decorateMessage(AllVerticesRunning(jobID))
-      }else{
-        val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]())
-        waitForAllVerticesToBeRunning += jobID -> (waiting + sender())
-
-        if(periodicCheck.isEmpty){
-          periodicCheck =
-            Some(
-              context.system.scheduler.schedule(
-                0 seconds,
-                200 millis,
-                self,
-                decorateMessage(NotifyListeners)
-              )
-            )
-        }
-      }
-    case WaitForAllVerticesToBeRunningOrFinished(jobID) =>
-      if(checkIfAllVerticesRunningOrFinished(jobID)){
-        sender() ! decorateMessage(AllVerticesRunning(jobID))
-      }else{
-        val waiting = waitForAllVerticesToBeRunningOrFinished.getOrElse(jobID, Set[ActorRef]())
-        waitForAllVerticesToBeRunningOrFinished += jobID -> (waiting + sender())
-
-        if(periodicCheck.isEmpty){
-          periodicCheck =
-            Some(
-              context.system.scheduler.schedule(
-                0 seconds,
-                200 millis,
-                self,
-                decorateMessage(NotifyListeners)
-              )
-            )
-        }
-      }
-
-    case NotifyListeners =>
-      for(jobID <- currentJobs.keySet){
-        notifyListeners(jobID)
-      }
-
-      if(waitForAllVerticesToBeRunning.isEmpty && waitForAllVerticesToBeRunningOrFinished.isEmpty) {
-        periodicCheck foreach { _.cancel() }
-        periodicCheck = None
-      }
-
-
-    case NotifyWhenJobRemoved(jobID) =>
-      val gateways = instanceManager.getAllRegisteredInstances.asScala.map(_.getActorGateway)
-
-      val responses = gateways.map{
-        gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean]
-      }
-
-      val jobRemovedOnJobManager = (self ? CheckIfJobRemoved(jobID))(timeout).mapTo[Boolean]
-
-      val allFutures = responses ++ Seq(jobRemovedOnJobManager)
-
-      import context.dispatcher
-      Future.fold(allFutures)(true)(_ & _) map(decorateMessage(_)) pipeTo sender()
-
-    case CheckIfJobRemoved(jobID) =>
-      if(currentJobs.contains(jobID)) {
-        context.system.scheduler.scheduleOnce(
-          200 milliseconds,
-          self,
-          decorateMessage(CheckIfJobRemoved(jobID))
-        )(context.dispatcher, sender())
-      } else {
-        sender() ! decorateMessage(true)
-      }
-
-    case NotifyWhenTaskManagerTerminated(taskManager) =>
-      val waiting = waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set())
-      waitForTaskManagerToBeTerminated += taskManager.path.name -> (waiting + sender)
-
-    case msg@Terminated(taskManager) =>
-      super.handleMessage(msg)
-
-      waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
-        _ foreach {
-          listener =>
-            listener ! decorateMessage(TaskManagerTerminated(taskManager))
-        }
-      }
-
-    // see shutdown method for reply
-    case NotifyOfComponentShutdown =>
-      waitForShutdown += sender()
-
-    case NotifyWhenAccumulatorChange(jobID) =>
-
-      val (updated, registered) = waitForAccumulatorUpdate.
-        getOrElse(jobID, (false, Set[ActorRef]()))
-      waitForAccumulatorUpdate += jobID -> (updated, registered + sender)
-      sender ! true
-
-    /**
-     * Notification from the task manager that changed accumulator are transferred on next
-     * Hearbeat. We need to keep this state to notify the listeners on next Heartbeat report.
-     */
-    case AccumulatorsChanged(jobID: JobID) =>
-      waitForAccumulatorUpdate.get(jobID) match {
-        case Some((updated, registered)) =>
-          waitForAccumulatorUpdate.put(jobID, (true, registered))
-        case None =>
-      }
-
-    /**
-     * Disabled async processing of accumulator values and send accumulators to the listeners if
-     * we previously received an [[AccumulatorsChanged]] message.
-     */
-    case msg : Heartbeat =>
-      super.handleMessage(msg)
-
-      waitForAccumulatorUpdate foreach {
-        case (jobID, (updated, actors)) if updated =>
-          currentJobs.get(jobID) match {
-            case Some((graph, jobInfo)) =>
-              val flinkAccumulators = graph.getFlinkAccumulators
-              val userAccumulators = graph.aggregateUserAccumulators
-              actors foreach {
-                actor => actor ! UpdatedAccumulators(jobID, flinkAccumulators, userAccumulators)
-              }
-            case None =>
-          }
-          waitForAccumulatorUpdate.put(jobID, (false, actors))
-        case _ =>
-      }
-
-    case RequestWorkingTaskManager(jobID) =>
-      currentJobs.get(jobID) match {
-        case Some((eg, _)) =>
-          if(eg.getAllExecutionVertices.asScala.isEmpty){
-            sender ! decorateMessage(WorkingTaskManager(None))
-          } else {
-            val resource = eg.getAllExecutionVertices.asScala.head.getCurrentAssignedResource
-
-            if(resource == null){
-              sender ! decorateMessage(WorkingTaskManager(None))
-            } else {
-              sender ! decorateMessage(
-                WorkingTaskManager(
-                  Some(resource.getTaskManagerActorGateway())
-                )
-              )
-            }
-          }
-        case None => sender ! decorateMessage(WorkingTaskManager(None))
-      }
-
-    case NotifyWhenJobStatus(jobID, state) =>
-      val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID,
-        scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]())
-
-      val listener = jobStatusListener.getOrElse(state, Set[ActorRef]())
-
-      jobStatusListener += state -> (listener + sender)
-
-    case msg@JobStatusChanged(jobID, newJobStatus, _, _) =>
-      super.handleMessage(msg)
-
-      val cleanup = waitForJobStatus.get(jobID) match {
-        case Some(stateListener) =>
-          stateListener.remove(newJobStatus) match {
-            case Some(listeners) =>
-              listeners foreach {
-                _ ! decorateMessage(JobStatusIs(jobID, newJobStatus))
-              }
-            case _ =>
-          }
-          stateListener.isEmpty
-
-        case _ => false
-      }
-
-      if (cleanup) {
-        waitForJobStatus.remove(jobID)
-      }
-
-    case DisableDisconnect =>
-      disconnectDisabled = true
-
-    case DisablePostStop =>
-      postStopEnabled = false
-
-    case RequestSavepoint(savepointPath) =>
-      try {
-        val savepoint = savepointStore.loadSavepoint(savepointPath)
-        sender ! ResponseSavepoint(savepoint)
-      }
-      catch {
-        case e: Exception =>
-          sender ! ResponseSavepoint(null)
-      }
-
-    case msg: Disconnect =>
-      if (!disconnectDisabled) {
-        super.handleMessage(msg)
-
-        val taskManager = sender()
-
-        waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
-          _ foreach {
-            listener =>
-              listener ! decorateMessage(TaskManagerTerminated(taskManager))
-          }
-        }
-      }
-
-    case NotifyWhenLeader =>
-      if (leaderElectionService.hasLeadership) {
-        sender() ! true
-      } else {
-        waitForLeader += sender()
-      }
-
-    case msg: GrantLeadership =>
-      super.handleMessage(msg)
-
-      waitForLeader.foreach(_ ! true)
-
-      waitForLeader.clear()
-
-    case NotifyWhenClientConnects =>
-      waitForClient += sender()
-      sender() ! true
-
-    case msg: RegisterJobClient =>
-      super.handleMessage(msg)
-      waitForClient.foreach(_ ! ClientConnected)
-    case msg: RequestClassloadingProps =>
-      super.handleMessage(msg)
-      waitForClient.foreach(_ ! ClassLoadingPropsDelivered)
-
-    case NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) =>
-      if (that.instanceManager.getNumberOfRegisteredTaskManagers >= numRegisteredTaskManager) {
-        // there are already at least numRegisteredTaskManager registered --> send Acknowledge
-        sender() ! Acknowledge
-      } else {
-        // wait until we see at least numRegisteredTaskManager being registered at the JobManager
-        waitForNumRegisteredTaskManagers += ((numRegisteredTaskManager, sender()))
-      }
-
-    // TaskManager may be registered on these two messages
-    case msg @ (_: RegisterTaskManager) =>
-      super.handleMessage(msg)
-
-      // dequeue all senders which wait for instanceManager.getNumberOfStartedTaskManagers or
-      // fewer registered TaskManagers
-      while (waitForNumRegisteredTaskManagers.nonEmpty &&
-        waitForNumRegisteredTaskManagers.head._1 <=
-          instanceManager.getNumberOfRegisteredTaskManagers) {
-        val receiver = waitForNumRegisteredTaskManagers.dequeue()._2
-        receiver ! Acknowledge
-      }
-  }
-
-  def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
-    currentJobs.get(jobID) match {
-      case Some((eg, _)) =>
-        eg.getAllExecutionVertices.asScala.forall( _.getExecutionState == ExecutionState.RUNNING)
-      case None => false
-    }
-  }
-
-  def checkIfAllVerticesRunningOrFinished(jobID: JobID): Boolean = {
-    currentJobs.get(jobID) match {
-      case Some((eg, _)) =>
-        eg.getAllExecutionVertices.asScala.forall {
-          case vertex =>
-            (vertex.getExecutionState == ExecutionState.RUNNING
-              || vertex.getExecutionState == ExecutionState.FINISHED)
-        }
-      case None => false
-    }
-  }
-
-  def notifyListeners(jobID: JobID): Unit = {
-    if(checkIfAllVerticesRunning(jobID)) {
-      waitForAllVerticesToBeRunning.remove(jobID) match {
-        case Some(listeners) =>
-          for (listener <- listeners) {
-            listener ! decorateMessage(AllVerticesRunning(jobID))
-          }
-        case _ =>
-      }
-    }
-
-    if(checkIfAllVerticesRunningOrFinished(jobID)) {
-      waitForAllVerticesToBeRunningOrFinished.remove(jobID) match {
-        case Some(listeners) =>
-          for (listener <- listeners) {
-            listener ! decorateMessage(AllVerticesRunning(jobID))
-          }
-        case _ =>
-      }
-    }
-  }
-
-  /**
-    * No killing of the VM for testing.
-    */
-  override protected def shutdown(): Unit = {
-    log.info("Shutting down TestingJobManager.")
-    waitForShutdown.foreach(_ ! ComponentShutdown(self))
-    waitForShutdown.clear()
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
deleted file mode 100644
index f121305..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import java.util.Map
-
-import akka.actor.ActorRef
-import org.apache.flink.api.common.JobID
-import org.apache.flink.api.common.accumulators.Accumulator
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint
-import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
-import org.apache.flink.runtime.instance.ActorGateway
-import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
-
-object TestingJobManagerMessages {
-
-  case class RequestExecutionGraph(jobID: JobID)
-
-  sealed trait ResponseExecutionGraph {
-    def jobID: JobID
-  }
-
-  case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph) extends
-  ResponseExecutionGraph
-
-  case class ExecutionGraphNotFound(jobID: JobID) extends ResponseExecutionGraph
-
-  case class WaitForAllVerticesToBeRunning(jobID: JobID)
-  case class WaitForAllVerticesToBeRunningOrFinished(jobID: JobID)
-  case class AllVerticesRunning(jobID: JobID)
-
-  case class NotifyWhenJobRemoved(jobID: JobID)
-
-  case class RequestWorkingTaskManager(jobID: JobID)
-  case class WorkingTaskManager(gatewayOption: Option[ActorGateway])
-
-  case class NotifyWhenJobStatus(jobID: JobID, state: JobStatus)
-  case class JobStatusIs(jobID: JobID, state: JobStatus)
-
-  case object NotifyListeners
-
-  case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
-  case class TaskManagerTerminated(taskManager: ActorRef)
-
-  /**
-   * Registers a listener to receive a message when accumulators changed.
-   * The change must be explicitly triggered by the TestingTaskManager which can receive an
-   * [[org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged]]
-   * message by a task that changed the accumulators. This message is then
-   * forwarded to the JobManager which will send the accumulators in the [[UpdatedAccumulators]]
-   * message when the next Heartbeat occurs.
-   */
-  case class NotifyWhenAccumulatorChange(jobID: JobID)
-
-  /**
-   * Reports updated accumulators back to the listener.
-   */
-  case class UpdatedAccumulators(jobID: JobID,
-    flinkAccumulators: Map[ExecutionAttemptID, Map[AccumulatorRegistry.Metric, Accumulator[_,_]]],
-    userAccumulators: Map[String, Accumulator[_,_]])
-
-  /** Notifies the sender when the [[TestingJobManager]] has been elected as the leader
-   *
-   */
-  case object NotifyWhenLeader
-
-  /**
-    * Notifies the sender when the [[TestingJobManager]] receives new clients for jobs
-    */
-  case object NotifyWhenClientConnects
-  /**
-    * Notifes of client connect
-    */
-  case object ClientConnected
-  /**
-    * Notifies when the client has requested class loading information
-    */
-  case object ClassLoadingPropsDelivered
-
-  /**
-   * Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]]
-   * message when at least numRegisteredTaskManager have registered at the JobManager.
-   *
-   * @param numRegisteredTaskManager minimum number of registered TMs before the sender is notified
-   */
-  case class NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager: Int)
-
-  /** Disables the post stop method of the [[TestingJobManager]].
-    *
-    * Only the leaderElectionService is stopped in the postStop method call to revoke the leadership
-    */
-  case object DisablePostStop
-
-  /**
-    * Requests a savepoint from the job manager.
-    *
-    * @param savepointPath The path of the savepoint to request.
-    */
-  case class RequestSavepoint(savepointPath: String)
-
-  /**
-    * Response to a savepoint request.
-    *
-    * @param savepoint The requested savepoint or null if none available.
-    */
-  case class ResponseSavepoint(savepoint: Savepoint)
-
-  def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader
-  def getNotifyWhenClientConnects(): AnyRef = NotifyWhenClientConnects
-  def getDisablePostStop(): AnyRef = DisablePostStop
-
-  def getClientConnected(): AnyRef = ClientConnected
-  def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
deleted file mode 100644
index 48a1ddd..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import org.apache.flink.runtime.jobmanager.MemoryArchivist
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphFound, ExecutionGraphNotFound, RequestExecutionGraph}
-
-/** Memory archivist extended by testing messages
-  *
-  * @param maxEntries number of maximum number of archived jobs
-  */
-class TestingMemoryArchivist(maxEntries: Int) extends MemoryArchivist(maxEntries) {
-
-  override def handleMessage: Receive = {
-    handleTestingMessage orElse super.handleMessage
-  }
-
-  def handleTestingMessage: Receive = {
-    case RequestExecutionGraph(jobID) =>
-      val executionGraph = graphs.get(jobID)
-      
-      executionGraph match {
-        case Some(graph) => sender ! decorateMessage(ExecutionGraphFound(jobID, graph))
-        case None => sender ! decorateMessage(ExecutionGraphNotFound(jobID))
-      }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
deleted file mode 100644
index 91d169a..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.ActorRef
-import org.apache.flink.api.common.JobID
-
-object TestingMessages {
-
-  case class CheckIfJobRemoved(jobID: JobID)
-
-  case object DisableDisconnect
-
-  case object Alive
-
-  def getAlive: AnyRef = Alive
-
-  def getDisableDisconnect: AnyRef = DisableDisconnect
-
-  case object NotifyOfComponentShutdown
-  case class ComponentShutdown(ref: ActorRef)
-
-  def getNotifyOfComponentShutdown(): AnyRef = NotifyOfComponentShutdown
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
deleted file mode 100644
index 9b5a147..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.io.disk.iomanager.IOManager
-import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
-import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManagerLocation, TaskManager, TaskManagerConfiguration}
-
-import scala.language.postfixOps
-
-/** Subclass of the [[TaskManager]] to support testing messages
- */
-class TestingTaskManager(
-                          config: TaskManagerConfiguration,
-                          resourceID: ResourceID,
-                          connectionInfo: TaskManagerLocation,
-                          memoryManager: MemoryManager,
-                          ioManager: IOManager,
-                          network: NetworkEnvironment,
-                          numberOfSlots: Int,
-                          leaderRetrievalService: LeaderRetrievalService)
-  extends TaskManager(
-    config,
-    resourceID,
-    connectionInfo,
-    memoryManager,
-    ioManager,
-    network,
-    numberOfSlots,
-    leaderRetrievalService)
-  with TestingTaskManagerLike {
-
-  def this(
-            config: TaskManagerConfiguration,
-            connectionInfo: TaskManagerLocation,
-            memoryManager: MemoryManager,
-            ioManager: IOManager,
-            network: NetworkEnvironment,
-            numberOfSlots: Int,
-            leaderRetrievalService: LeaderRetrievalService) {
-    this(
-      config,
-      ResourceID.generate(),
-      connectionInfo,
-      memoryManager,
-      ioManager,
-      network,
-      numberOfSlots,
-      leaderRetrievalService)
-  }
-}