You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/08 15:17:49 UTC

[4/4] flink git commit: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFlinkMiniCluster

[FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFlinkMiniCluster

Rename _configuration to originalConfiguration

Remove testing classes from main scope in flink-runtime

Previously, the ForkableFlinkMiniCluster which resided in flink-test-utils required
these files to be in the main scope of flink-runtime. With the removal of the
ForkableFlinkMiniCluster, these classes are now no longer needed and can be moved
back to the test scope.

This closes #2450.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02b852e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02b852e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02b852e3

Branch: refs/heads/master
Commit: 02b852e3571e46f25fdfc79f43ceb726ddff9ba7
Parents: 920cda4
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Aug 31 17:58:09 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Sep 8 17:17:28 2016 +0200

----------------------------------------------------------------------
 .../api/avro/AvroExternalJarProgramITCase.java  |   7 +-
 .../flink/contrib/streaming/CollectITCase.java  |   4 +-
 .../operations/DegreesWithExceptionITCase.java  |   6 +-
 .../ReduceOnEdgesWithExceptionITCase.java       |   6 +-
 .../ReduceOnNeighborsWithExceptionITCase.java   |   6 +-
 .../apache/flink/ml/util/FlinkTestBase.scala    |  11 +-
 .../clusterframework/FlinkResourceManager.java  |  13 +-
 .../testutils/TestingResourceManager.java       | 137 ------
 .../flink/runtime/jobmanager/JobManager.scala   |  45 +-
 .../runtime/messages/TaskManagerMessages.scala  |  26 ++
 .../runtime/minicluster/FlinkMiniCluster.scala  |  73 +++-
 .../minicluster/LocalFlinkMiniCluster.scala     | 235 ++++++++---
 .../flink/runtime/taskmanager/TaskManager.scala | 130 ++++--
 .../testingUtils/TestingJobManager.scala        |  72 ----
 .../testingUtils/TestingJobManagerLike.scala    | 417 -------------------
 .../TestingJobManagerMessages.scala             | 133 ------
 .../testingUtils/TestingMemoryArchivist.scala   |  43 --
 .../runtime/testingUtils/TestingMessages.scala  |  40 --
 .../testingUtils/TestingTaskManager.scala       |  70 ----
 .../testingUtils/TestingTaskManagerLike.scala   | 248 -----------
 .../TestingTaskManagerMessages.scala            |  94 -----
 .../LeaderElectionRetrievalTestingCluster.java  |   3 +-
 .../testutils/TestingResourceManager.java       | 137 ++++++
 .../runtime/testingUtils/TestingCluster.scala   | 322 ++++++++------
 .../testingUtils/TestingJobManager.scala        |  71 ++++
 .../testingUtils/TestingJobManagerLike.scala    | 417 +++++++++++++++++++
 .../TestingJobManagerMessages.scala             | 132 ++++++
 .../testingUtils/TestingMemoryArchivist.scala   |  43 ++
 .../runtime/testingUtils/TestingMessages.scala  |  40 ++
 .../testingUtils/TestingTaskManager.scala       |  70 ++++
 .../testingUtils/TestingTaskManagerLike.scala   | 234 +++++++++++
 .../TestingTaskManagerMessages.scala            |  82 ++++
 .../flink/api/scala/ScalaShellITCase.scala      |   7 +-
 .../cassandra/CassandraConnectorITCase.java     |   6 +-
 .../kafka/KafkaShortRetentionTestBase.java      |   6 +-
 .../connectors/kafka/KafkaTestBase.java         |   6 +-
 .../manualtests/ManualExactlyOnceTest.java      |   4 +-
 ...nualExactlyOnceWithStreamReshardingTest.java |   4 +-
 ...ScalaStreamingMultipleProgramsTestBase.scala |   5 +-
 .../flink-test-utils/pom.xml                    | 149 -------
 .../util/StreamingMultipleProgramsTestBase.java |   4 +-
 .../streaming/util/TestStreamEnvironment.java   |   8 +-
 .../flink/test/util/AbstractTestBase.java       |   3 +-
 .../test/util/MultipleProgramsTestBase.java     |   3 +-
 .../apache/flink/test/util/TestBaseUtils.java   |  31 +-
 .../apache/flink/test/util/TestEnvironment.java |   7 +-
 .../test/util/ForkableFlinkMiniCluster.scala    | 335 ---------------
 .../accumulators/AccumulatorErrorITCase.java    |   6 +-
 .../accumulators/AccumulatorLiveITCase.java     |   1 -
 .../test/cancelling/CancelingTestBase.java      |   7 +-
 .../EventTimeAllWindowCheckpointingITCase.java  |   6 +-
 .../EventTimeWindowCheckpointingITCase.java     |   6 +-
 .../test/checkpointing/RescalingITCase.java     |   6 +-
 .../test/checkpointing/SavepointITCase.java     |  19 +-
 .../StreamCheckpointNotifierITCase.java         |   6 +-
 .../StreamFaultToleranceTestBase.java           |   6 +-
 .../WindowCheckpointingITCase.java              |   6 +-
 .../test/classloading/ClassLoaderITCase.java    |   7 +-
 .../clients/examples/JobRetrievalITCase.java    |   5 +-
 .../JobSubmissionFailsITCase.java               |   6 +-
 .../CustomDistributionITCase.java               |   4 +-
 .../RemoteEnvironmentITCase.java                |   7 +-
 .../flink/test/misc/AutoParallelismITCase.java  |   6 +-
 .../test/misc/CustomSerializationITCase.java    |   6 +-
 .../test/misc/MiscellaneousIssuesITCase.java    |   6 +-
 ...SuccessAfterNetworkBuffersFailureITCase.java |   6 +-
 .../flink/test/query/QueryableStateITCase.java  |   6 +-
 .../flink/test/recovery/FastFailuresITCase.java |   4 +-
 ...SimpleRecoveryFailureRateStrategyITBase.java |   6 +-
 ...RecoveryFixedDelayRestartStrategyITBase.java |   6 +-
 .../test/recovery/SimpleRecoveryITCaseBase.java |   4 +-
 .../TaskManagerFailureRecoveryITCase.java       |   6 +-
 .../flink/test/runtime/IPv6HostnamesITCase.java |   6 +-
 .../ZooKeeperLeaderElectionITCase.java          |  56 +--
 .../test/streaming/runtime/TimestampITCase.java |   6 +-
 .../flink/test/web/WebFrontendITCase.java       |   6 +-
 .../jobmanager/JobManagerFailsITCase.scala      |   8 +-
 .../taskmanager/TaskManagerFailsITCase.scala    |  12 +-
 flink-yarn-tests/pom.xml                        |   8 +
 .../org/apache/flink/yarn/YarnTestBase.java     |   1 -
 tools/maven/scalastyle-config.xml               |   2 +-
 81 files changed, 2037 insertions(+), 2167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index 29a7e58..1030ff8 100644
--- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -25,12 +25,11 @@ import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-
 public class AvroExternalJarProgramITCase {
 
 	private static final String JAR_FILE = "maven-test-jar.jar";
@@ -40,12 +39,12 @@ public class AvroExternalJarProgramITCase {
 	@Test
 	public void testExternalProgram() {
 
-		ForkableFlinkMiniCluster testMiniCluster = null;
+		LocalFlinkMiniCluster testMiniCluster = null;
 
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
-			testMiniCluster = new ForkableFlinkMiniCluster(config, false);
+			testMiniCluster = new LocalFlinkMiniCluster(config, false);
 			testMiniCluster.start();
 
 			String jarFile = JAR_FILE;

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
index 10ea85c..d691621 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
@@ -19,9 +19,9 @@
 package org.apache.flink.contrib.streaming;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
 import org.junit.Test;
 
@@ -36,7 +36,7 @@ public class CollectITCase {
 
 	@Test
 	public void testCollect() throws Exception {
-		final ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(new Configuration(), false);
+		final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new Configuration(), false);
 		try {
 			cluster.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
index 551a97b..02eea07 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
 import org.apache.flink.types.LongValue;
 import org.junit.AfterClass;
@@ -39,7 +39,7 @@ public class DegreesWithExceptionITCase {
 
 	private static final int PARALLELISM = 4;
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 	
 
 	@BeforeClass
@@ -47,7 +47,7 @@ public class DegreesWithExceptionITCase {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 			cluster.start();
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
index 56a0a59..666f7ef 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
@@ -30,7 +30,7 @@ import org.apache.flink.graph.EdgesFunctionWithVertexValue;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -42,7 +42,7 @@ public class ReduceOnEdgesWithExceptionITCase {
 
 	private static final int PARALLELISM = 4;
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 
 	@BeforeClass
@@ -50,7 +50,7 @@ public class ReduceOnEdgesWithExceptionITCase {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 			cluster.start();
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
index 7458e08..0bbdc84 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
@@ -31,7 +31,7 @@ import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
 import org.apache.flink.graph.ReduceNeighborsFunction;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -43,7 +43,7 @@ public class ReduceOnNeighborsWithExceptionITCase {
 
 	private static final int PARALLELISM = 4;
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 
 	@BeforeClass
@@ -51,7 +51,7 @@ public class ReduceOnNeighborsWithExceptionITCase {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 			cluster.start();
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
index fb98f24..6353d6a 100644
--- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
+++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
@@ -18,14 +18,15 @@
 
 package org.apache.flink.ml.util
 
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils, TestEnvironment}
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+import org.apache.flink.test.util.{TestBaseUtils, TestEnvironment}
 import org.scalatest.{BeforeAndAfter, Suite}
 
-/** Mixin to start and stop a ForkableFlinkMiniCluster automatically for Scala based tests.
+/** Mixin to start and stop a LocalFlinkMiniCluster automatically for Scala based tests.
   * Additionally a TestEnvironment with the started cluster is created and set as the default
   * [[org.apache.flink.api.java.ExecutionEnvironment]].
   *
-  * This mixin starts a ForkableFlinkMiniCluster with one TaskManager and a number of slots given
+  * This mixin starts a LocalFlinkMiniCluster with one TaskManager and a number of slots given
   * by parallelism. This value can be overridden in a sub class in order to start the cluster
   * with a different number of slots.
   *
@@ -37,7 +38,7 @@ import org.scalatest.{BeforeAndAfter, Suite}
   * @example
   *          {{{
   *            def testSomething: Unit = {
-  *             // Obtain TestEnvironment with started ForkableFlinkMiniCluster
+  *             // Obtain TestEnvironment with started LocalFlinkMiniCluster
   *             val env = ExecutionEnvironment.getExecutionEnvironment
   *
   *             env.fromCollection(...)
@@ -50,7 +51,7 @@ import org.scalatest.{BeforeAndAfter, Suite}
 trait FlinkTestBase extends BeforeAndAfter {
   that: Suite =>
 
-  var cluster: Option[ForkableFlinkMiniCluster] = None
+  var cluster: Option[LocalFlinkMiniCluster] = None
   val parallelism = 4
 
   before {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
index 95be084..7ea286d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
@@ -767,8 +767,19 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
 			Class<? extends FlinkResourceManager<?>> resourceManagerClass,
 			String resourceManagerActorName) {
 
-		Props resourceMasterProps = Props.create(resourceManagerClass, configuration, leaderRetriever);
+		Props resourceMasterProps = getResourceManagerProps(
+			resourceManagerClass,
+			configuration,
+			leaderRetriever);
 
 		return actorSystem.actorOf(resourceMasterProps, resourceManagerActorName);
 	}
+
+	public static Props getResourceManagerProps(
+		Class<? extends FlinkResourceManager> resourceManagerClass,
+		Configuration configuration,
+		LeaderRetrievalService leaderRetrievalService) {
+
+		return Props.create(resourceManagerClass, configuration, leaderRetrievalService);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
deleted file mode 100644
index 495cacd..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testutils;
-
-import akka.actor.ActorRef;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
-import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.testingUtils.TestingMessages;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-
-/**
- * A testing resource manager which may alter the default standalone resource master's behavior.
- */
-public class TestingResourceManager extends StandaloneResourceManager {
-
-	/** Set of Actors which want to be informed of a connection to the job manager */
-	private Set<ActorRef> waitForResourceManagerConnected = new HashSet<>();
-
-	/** Set of Actors which want to be informed of a shutdown */
-	private Set<ActorRef> waitForShutdown = new HashSet<>();
-
-	/** Flag to signal a connection to the JobManager */
-	private boolean isConnected = false;
-
-	public TestingResourceManager(Configuration flinkConfig, LeaderRetrievalService leaderRetriever) {
-		super(flinkConfig, leaderRetriever);
-	}
-
-	/**
-	 * Overwrite messages here if desired
-	 */
-	@Override
-	protected void handleMessage(Object message) {
-
-		if (message instanceof GetRegisteredResources) {
-			sender().tell(new GetRegisteredResourcesReply(getStartedTaskManagers()), self());
-		} else if (message instanceof FailResource) {
-			ResourceID resourceID = ((FailResource) message).resourceID;
-			notifyWorkerFailed(resourceID, "Failed for test case.");
-
-		} else if (message instanceof NotifyWhenResourceManagerConnected) {
-			if (isConnected) {
-				sender().tell(
-					Messages.getAcknowledge(),
-					self());
-			} else {
-				waitForResourceManagerConnected.add(sender());
-			}
-		} else if (message instanceof RegisterResourceManagerSuccessful) {
-			super.handleMessage(message);
-
-			isConnected = true;
-
-			for (ActorRef ref : waitForResourceManagerConnected) {
-				ref.tell(
-					Messages.getAcknowledge(),
-					self());
-			}
-			waitForResourceManagerConnected.clear();
-
-		} else if (message instanceof TestingMessages.NotifyOfComponentShutdown$) {
-			waitForShutdown.add(sender());
-		} else if (message instanceof TestingMessages.Alive$) {
-			sender().tell(Messages.getAcknowledge(), self());
-		} else {
-			super.handleMessage(message);
-		}
-	}
-
-	/**
-	 * Testing messages
-	 */
-	public static class GetRegisteredResources {}
-
-	public static class GetRegisteredResourcesReply {
-
-		public Collection<ResourceID> resources;
-
-		public GetRegisteredResourcesReply(Collection<ResourceID> resources) {
-			this.resources = resources;
-		}
-
-	}
-
-	/**
-	 * Fails all resources that the resource manager has registered
-	 */
-	public static class FailResource {
-
-		public ResourceID resourceID;
-
-		public FailResource(ResourceID resourceID) {
-			this.resourceID = resourceID;
-		}
-	}
-
-	/**
-	 * The sender of this message will be informed of a connection to the Job Manager
-	 */
-	public static class NotifyWhenResourceManagerConnected {}
-
-	/**
-	 * Inform registered listeners about a shutdown of the application.
-     */
-	@Override
-	protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
-		for (ActorRef listener : waitForShutdown) {
-			listener.tell(new TestingMessages.ComponentShutdown(self()), self());
-		}
-		waitForShutdown.clear();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 88af604..f67be0e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -49,7 +49,7 @@ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceMa
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.executiongraph.{StatusListenerMessenger, ExecutionGraph, ExecutionJobVertex}
+import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex, StatusListenerMessenger}
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
@@ -2721,7 +2721,7 @@ object JobManager {
       configuration,
       None)
 
-    val archiveProps = Props(archiveClass, archiveCount)
+    val archiveProps = getArchiveProps(archiveClass, archiveCount)
 
     // start the archiver with the given name, or without (avoid name conflicts)
     val archive: ActorRef = archiveActorName match {
@@ -2729,7 +2729,7 @@ object JobManager {
       case None => actorSystem.actorOf(archiveProps)
     }
 
-    val jobManagerProps = Props(
+    val jobManagerProps = getJobManagerProps(
       jobManagerClass,
       configuration,
       executorService,
@@ -2754,6 +2754,45 @@ object JobManager {
     (jobManager, archive)
   }
 
+  def getArchiveProps(archiveClass: Class[_ <: MemoryArchivist], archiveCount: Int): Props = {
+    Props(archiveClass, archiveCount)
+  }
+
+  def getJobManagerProps(
+    jobManagerClass: Class[_ <: JobManager],
+    configuration: Configuration,
+    executorService: ExecutorService,
+    instanceManager: InstanceManager,
+    scheduler: FlinkScheduler,
+    libraryCacheManager: BlobLibraryCacheManager,
+    archive: ActorRef,
+    restartStrategyFactory: RestartStrategyFactory,
+    timeout: FiniteDuration,
+    leaderElectionService: LeaderElectionService,
+    submittedJobGraphStore: SubmittedJobGraphStore,
+    checkpointRecoveryFactory: CheckpointRecoveryFactory,
+    savepointStore: SavepointStore,
+    jobRecoveryTimeout: FiniteDuration,
+    metricsRegistry: Option[FlinkMetricRegistry]): Props = {
+
+    Props(
+      jobManagerClass,
+      configuration,
+      executorService,
+      instanceManager,
+      scheduler,
+      libraryCacheManager,
+      archive,
+      restartStrategyFactory,
+      timeout,
+      leaderElectionService,
+      submittedJobGraphStore,
+      checkpointRecoveryFactory,
+      savepointStore,
+      jobRecoveryTimeout,
+      metricsRegistry)
+  }
+
   // --------------------------------------------------------------------------
   //  Resolving the JobManager endpoint
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index 2d99245..b433015 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -130,6 +130,16 @@ object TaskManagerMessages {
     */
   case class RequestTaskManagerLog(requestType : LogTypeRequest)
 
+  /** Requests the number of active connections at the ConnectionManager */
+  case object RequestNumActiveConnections
+
+  case class ResponseNumActiveConnections(number: Int)
+
+  /** Requests the number of broadcast variables with references */
+  case object RequestBroadcastVariablesWithReferences
+
+  case class ResponseBroadcastVariablesWithReferences(number: Int)
+
 
   // --------------------------------------------------------------------------
   //  Utility getters for case objects to simplify access from Java
@@ -166,4 +176,20 @@ object TaskManagerMessages {
   def getRequestTaskManagerStdout(): AnyRef = {
     RequestTaskManagerLog(StdOutFileRequest)
   }
+
+  /**
+    * Accessor for the case object instance, to simplify Java interoperability.
+    * @return The RequestBroadcastVariablesWithReferences case object instance.
+    */
+  def getRequestBroadcastVariablesWithReferences(): RequestBroadcastVariablesWithReferences.type = {
+    RequestBroadcastVariablesWithReferences
+  }
+
+  /**
+    * Accessor for the case object instance, to simplify Java interoperability.
+    * @return The RequestNumActiveConnections case object instance.
+    */
+  def getRequestNumActiveConnections(): RequestNumActiveConnections.type  = {
+    RequestNumActiveConnections
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index a547d25..0178bd3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -69,7 +69,7 @@ abstract class FlinkMiniCluster(
     ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
     InetAddress.getByName("localhost").getHostAddress())
 
-  val configuration = generateConfiguration(userConfiguration)
+  protected val originalConfiguration = generateConfiguration(userConfiguration)
 
   /** Future to the [[ActorGateway]] of the current leader */
   var leaderGateway: Promise[ActorGateway] = Promise()
@@ -79,16 +79,16 @@ abstract class FlinkMiniCluster(
 
   /** Future lock */
   val futureLock = new Object()
-  
+
   implicit val executionContext = ExecutionContext.global
 
-  implicit val timeout = AkkaUtils.getTimeout(configuration)
+  implicit val timeout = AkkaUtils.getTimeout(originalConfiguration)
 
-  val haMode = HighAvailabilityMode.fromConfig(configuration)
+  val haMode = HighAvailabilityMode.fromConfig(originalConfiguration)
 
   val numJobManagers = getNumberOfJobManagers
 
-  var numTaskManagers = configuration.getInteger(
+  var numTaskManagers = originalConfiguration.getInteger(
     ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
     ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)
 
@@ -105,6 +105,22 @@ abstract class FlinkMiniCluster(
 
   private var isRunning = false
 
+  def configuration: Configuration = {
+    if (originalConfiguration.getInteger(
+      ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) == 0) {
+      val leaderConfiguration = new Configuration(originalConfiguration)
+
+      val leaderPort = getLeaderRPCPort
+
+      leaderConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, leaderPort)
+
+      leaderConfiguration
+    } else {
+      originalConfiguration
+    }
+  }
+
   // --------------------------------------------------------------------------
   //                           Abstract Methods
   // --------------------------------------------------------------------------
@@ -125,7 +141,7 @@ abstract class FlinkMiniCluster(
     if(haMode == HighAvailabilityMode.NONE) {
       1
     } else {
-      configuration.getInteger(
+      originalConfiguration.getInteger(
         ConfigConstants.LOCAL_NUMBER_JOB_MANAGER,
         ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER
       )
@@ -136,7 +152,7 @@ abstract class FlinkMiniCluster(
     if(haMode == HighAvailabilityMode.NONE) {
       1
     } else {
-      configuration.getInteger(
+      originalConfiguration.getInteger(
         ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER,
         ConfigConstants.DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER
       )
@@ -177,40 +193,55 @@ abstract class FlinkMiniCluster(
     Await.result(indexFuture, timeout)
   }
 
+  def getLeaderRPCPort: Int = {
+    val index = getLeaderIndex(timeout)
+
+    jobManagerActorSystems match {
+      case Some(jmActorSystems) =>
+        AkkaUtils.getAddress(jmActorSystems(index)).port match {
+          case Some(p) => p
+          case None => -1
+        }
+
+      case None => throw new Exception("The JobManager of the LocalFlinkMiniCluster has not been " +
+                                         "started properly.")
+    }
+  }
+
   def getResourceManagerAkkaConfig(index: Int): Config = {
     if (useSingleActorSystem) {
-      AkkaUtils.getAkkaConfig(configuration, None)
+      AkkaUtils.getAkkaConfig(originalConfiguration, None)
     } else {
-      val port = configuration.getInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
-        ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
+      val port = originalConfiguration.getInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
+                                                  ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
 
       val resolvedPort = if(port != 0) port + index else port
 
-      AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort)))
+      AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort)))
     }
   }
 
   def getJobManagerAkkaConfig(index: Int): Config = {
     if (useSingleActorSystem) {
-      AkkaUtils.getAkkaConfig(configuration, None)
+      AkkaUtils.getAkkaConfig(originalConfiguration, None)
     }
     else {
-      val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-        ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+      val port = originalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+                                                  ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 
       val resolvedPort = if(port != 0) port + index else port
 
-      AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort)))
+      AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort)))
     }
   }
 
   def getTaskManagerAkkaConfig(index: Int): Config = {
-    val port = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
+    val port = originalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
+                                                ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
 
     val resolvedPort = if(port != 0) port + index else port
 
-    AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort)))
+    AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort)))
   }
 
   /**
@@ -257,7 +288,7 @@ abstract class FlinkMiniCluster(
           "The FlinkMiniCluster has not been started yet.")
       }
     } else {
-      JobClient.startJobClientActorSystem(configuration)
+      JobClient.startJobClientActorSystem(originalConfiguration)
     }
   }
 
@@ -320,7 +351,7 @@ abstract class FlinkMiniCluster(
 
     val jobManagerAkkaURL = AkkaUtils.getAkkaURL(jmActorSystems(0), jmActors(0))
 
-    webMonitor = startWebServer(configuration, jmActorSystems(0), jobManagerAkkaURL)
+    webMonitor = startWebServer(originalConfiguration, jmActorSystems(0), jobManagerAkkaURL)
 
     if(waitForTaskManagerRegistration) {
       waitForTaskManagersToBeRegistered()
@@ -528,7 +559,7 @@ abstract class FlinkMiniCluster(
           new StandaloneLeaderRetrievalService(
             AkkaUtils.getAkkaURL(jmActorSystems(0), jmActors(0)))
         } else {
-          ZooKeeperUtils.createLeaderRetrievalService(configuration)
+          ZooKeeperUtils.createLeaderRetrievalService(originalConfiguration)
         }
 
       case _ => throw new Exception("The FlinkMiniCluster has not been started properly.")

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index d30c047..cac5d91 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -18,23 +18,36 @@
 
 package org.apache.flink.runtime.minicluster
 
-import akka.actor.{ActorRef, ActorSystem}
-import org.apache.flink.api.common.JobID
+import java.util.concurrent.ExecutorService
 
+import akka.actor.{ActorRef, ActorSystem, Props}
+import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.io.FileOutputFormat
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
-import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.clusterframework.types.{ResourceID, ResourceIDRetrievable}
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.io.disk.iomanager.IOManager
+import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.io.network.netty.NettyConfig
-import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
+import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, SubmittedJobGraphStore}
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
+import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.messages.JobManagerMessages
-import org.apache.flink.runtime.messages.JobManagerMessages.{CancellationFailure, CancellationResponse, StoppingFailure, StoppingResponse, RunningJobsStatus, RunningJobs}
-import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, StoppingFailure, StoppingResponse}
+import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation}
 import org.apache.flink.runtime.util.EnvironmentInformation
 
 import scala.concurrent.Await
+import scala.concurrent.duration.FiniteDuration
 
 /**
  * Local Flink mini cluster which executes all [[TaskManager]]s and the [[JobManager]] in the same
@@ -65,8 +78,25 @@ class LocalFlinkMiniCluster(
     config
   }
 
+  //------------------------------------------------------------------------------------------------
+  // Actor classes
+  //------------------------------------------------------------------------------------------------
+
+  val jobManagerClass: Class[_ <: JobManager] = classOf[JobManager]
+
+  val taskManagerClass: Class[_ <: TaskManager] = classOf[TaskManager]
+
+  val memoryArchivistClass: Class[_ <: MemoryArchivist] = classOf[MemoryArchivist]
+
+  val resourceManagerClass: Class[_ <: FlinkResourceManager[_ <: ResourceIDRetrievable]] =
+    classOf[StandaloneResourceManager]
+
+  //------------------------------------------------------------------------------------------------
+  // Start methods for the distributed components
+  //------------------------------------------------------------------------------------------------
+
   override def startJobManager(index: Int, system: ActorSystem): ActorRef = {
-    val config = configuration.clone()
+    val config = originalConfiguration.clone()
 
     val jobManagerName = getJobManagerName(index)
     val archiveName = getArchiveName(index)
@@ -79,19 +109,48 @@ class LocalFlinkMiniCluster(
       config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
     }
 
-    val (jobManager, _) = JobManager.startJobManagerActors(
-      config,
-      system,
-      Some(jobManagerName),
-      Some(archiveName),
-      classOf[JobManager],
-      classOf[MemoryArchivist])
-
-    jobManager
+    val (executorService,
+    instanceManager,
+    scheduler,
+    libraryCacheManager,
+    restartStrategyFactory,
+    timeout,
+    archiveCount,
+    leaderElectionService,
+    submittedJobGraphStore,
+    checkpointRecoveryFactory,
+    savepointStore,
+    jobRecoveryTimeout,
+    metricsRegistry) = JobManager.createJobManagerComponents(config, createLeaderElectionService())
+
+    val archive = system.actorOf(
+      getArchiveProps(
+        memoryArchivistClass,
+        archiveCount),
+      archiveName)
+
+    system.actorOf(
+      getJobManagerProps(
+        jobManagerClass,
+        config,
+        executorService,
+        instanceManager,
+        scheduler,
+        libraryCacheManager,
+        archive,
+        restartStrategyFactory,
+        timeout,
+        leaderElectionService,
+        submittedJobGraphStore,
+        checkpointRecoveryFactory,
+        savepointStore,
+        jobRecoveryTimeout,
+        metricsRegistry),
+      jobManagerName)
   }
 
   override def startResourceManager(index: Int, system: ActorSystem): ActorRef = {
-    val config = configuration.clone()
+    val config = originalConfiguration.clone()
 
     val resourceManagerName = getResourceManagerName(index)
 
@@ -103,18 +162,16 @@ class LocalFlinkMiniCluster(
       config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index)
     }
 
-    val resourceManager = FlinkResourceManager.startResourceManagerActors(
+    val resourceManagerProps = getResourceManagerProps(
+      resourceManagerClass,
       config,
-      system,
-      createLeaderRetrievalService(),
-      classOf[StandaloneResourceManager],
-      resourceManagerName)
+      createLeaderRetrievalService())
 
-    resourceManager
+    system.actorOf(resourceManagerProps, resourceManagerName)
   }
 
   override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
-    val config = configuration.clone()
+    val config = originalConfiguration.clone()
 
     val rpcPort = config.getInteger(
       ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
@@ -138,32 +195,115 @@ class LocalFlinkMiniCluster(
     } else {
       TaskManager.TASK_MANAGER_NAME
     }
-    
-    TaskManager.startTaskManagerComponentsAndActor(
+
+    val resourceID = ResourceID.generate() // generate random resource id
+
+    val (taskManagerConfig,
+    taskManagerLocation,
+    memoryManager,
+    ioManager,
+    network,
+    leaderRetrievalService) = TaskManager.createTaskManagerComponents(
       config,
-      ResourceID.generate(), // generate random resource id
-      system,
+      resourceID,
       hostname, // network interface to bind to
-      Some(taskManagerActorName), // actor name
-      Some(createLeaderRetrievalService()), // job manager leader retrieval service
       localExecution, // start network stack?
-      classOf[TaskManager])
+      Some(createLeaderRetrievalService()))
+
+    val props = getTaskManagerProps(
+      taskManagerClass,
+      taskManagerConfig,
+      resourceID,
+      taskManagerLocation,
+      memoryManager,
+      ioManager,
+      network,
+      leaderRetrievalService)
+
+    system.actorOf(props, taskManagerActorName)
   }
 
-  def getLeaderRPCPort: Int = {
-    val index = getLeaderIndex(timeout)
+  //------------------------------------------------------------------------------------------------
+  // Props for the distributed components
+  //------------------------------------------------------------------------------------------------
 
-    jobManagerActorSystems match {
-      case Some(jmActorSystems) =>
-        AkkaUtils.getAddress(jmActorSystems(index)).port match {
-          case Some(p) => p
-          case None => -1
-        }
+  def getArchiveProps(archiveClass: Class[_ <: MemoryArchivist], archiveCount: Int): Props = {
+    JobManager.getArchiveProps(archiveClass, archiveCount)
+  }
 
-      case None => throw new Exception("The JobManager of the LocalFlinkMiniCluster has not been " +
-        "started properly.")
-    }
+  def getJobManagerProps(
+    jobManagerClass: Class[_ <: JobManager],
+    configuration: Configuration,
+    executorService: ExecutorService,
+    instanceManager: InstanceManager,
+    scheduler: Scheduler,
+    libraryCacheManager: BlobLibraryCacheManager,
+    archive: ActorRef,
+    restartStrategyFactory: RestartStrategyFactory,
+    timeout: FiniteDuration,
+    leaderElectionService: LeaderElectionService,
+    submittedJobGraphStore: SubmittedJobGraphStore,
+    checkpointRecoveryFactory: CheckpointRecoveryFactory,
+    savepointStore: SavepointStore,
+    jobRecoveryTimeout: FiniteDuration,
+    metricsRegistry: Option[MetricRegistry]): Props = {
+
+    JobManager.getJobManagerProps(
+      jobManagerClass,
+      configuration,
+      executorService,
+      instanceManager,
+      scheduler,
+      libraryCacheManager,
+      archive,
+      restartStrategyFactory,
+      timeout,
+      leaderElectionService,
+      submittedJobGraphStore,
+      checkpointRecoveryFactory,
+      savepointStore,
+      jobRecoveryTimeout,
+      metricsRegistry)
+  }
+
+  def getTaskManagerProps(
+    taskManagerClass: Class[_ <: TaskManager],
+    taskManagerConfig: TaskManagerConfiguration,
+    resourceID: ResourceID,
+    taskManagerLocation: TaskManagerLocation,
+    memoryManager: MemoryManager,
+    ioManager: IOManager,
+    networkEnvironment: NetworkEnvironment,
+    leaderRetrievalService: LeaderRetrievalService): Props = {
+
+    TaskManager.getTaskManagerProps(
+      taskManagerClass,
+      taskManagerConfig,
+      resourceID,
+      taskManagerLocation,
+      memoryManager,
+      ioManager,
+      networkEnvironment,
+      leaderRetrievalService)
+  }
+
+  def getResourceManagerProps(
+    resourceManagerClass: Class[_ <: FlinkResourceManager[_ <: ResourceIDRetrievable]],
+    configuration: Configuration,
+    leaderRetrievalService: LeaderRetrievalService): Props = {
+
+    FlinkResourceManager.getResourceManagerProps(
+      resourceManagerClass,
+      configuration,
+      leaderRetrievalService)
+  }
+
+  //------------------------------------------------------------------------------------------------
+  // Helper methods
+  //------------------------------------------------------------------------------------------------
 
+  def createLeaderElectionService(): Option[LeaderElectionService] = {
+    None
   }
 
   def initializeIOFormatClasses(configuration: Configuration): Unit = {
@@ -186,7 +326,7 @@ class LocalFlinkMiniCluster(
       val bufferSize: Int = config.getInteger(
         ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
         ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)
-      
+
       val bufferMem: Long = config.getLong(
         ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
         ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) * bufferSize.toLong
@@ -218,6 +358,7 @@ class LocalFlinkMiniCluster(
     val config: Configuration = new Configuration()
 
     config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, hostname)
+    config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
 
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
       ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)
@@ -252,11 +393,11 @@ class LocalFlinkMiniCluster(
       JobManager.ARCHIVE_NAME
     }
   }
-  
+
   // --------------------------------------------------------------------------
   //  Actions on running jobs
   // --------------------------------------------------------------------------
-  
+
   def currentlyRunningJobs: Iterable[JobID] = {
     val leader = getLeaderGateway(timeout)
     val future = leader.ask(JobManagerMessages.RequestRunningJobsStatus, timeout)
@@ -269,7 +410,7 @@ class LocalFlinkMiniCluster(
     currentlyRunningJobs.foreach(list.add)
     list
   }
-  
+
   def stopJob(id: JobID) : Unit = {
     val leader = getLeaderGateway(timeout)
     val response = leader.ask(new JobManagerMessages.StopJob(id), timeout)

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 84750a3..de85f30 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -354,6 +354,21 @@ class TaskManager(
         case None =>
           sender() ! new IOException("BlobService not available. Cannot upload TaskManager logs.")
       }
+
+    case RequestBroadcastVariablesWithReferences =>
+      sender ! decorateMessage(
+        ResponseBroadcastVariablesWithReferences(
+          bcVarManager.getNumberOfVariablesWithReferences)
+      )
+
+    case RequestNumActiveConnections =>
+      val numActive = if (!network.isShutdown) {
+        network.getConnectionManager.getNumberOfActiveConnections
+      } else {
+        0
+      }
+
+      sender ! decorateMessage(ResponseNumActiveConnections(numActive))
   }
 
   /**
@@ -1781,6 +1796,7 @@ object TaskManager {
   }
 
   /**
+   * Starts the task manager actor.
    *
    * @param configuration The configuration for the TaskManager.
    * @param resourceID The id of the resource which the task manager will run on.
@@ -1817,11 +1833,75 @@ object TaskManager {
       taskManagerClass: Class[_ <: TaskManager])
     : ActorRef = {
 
-    val (taskManagerConfig : TaskManagerConfiguration,      
-      netConfig: NetworkEnvironmentConfiguration,
-      taskManagerAddress: InetSocketAddress,
-      memType: MemoryType
-    ) = parseTaskManagerConfiguration(
+    val (taskManagerConfig,
+      connectionInfo,
+      memoryManager,
+      ioManager,
+      network,
+      leaderRetrievalService) = createTaskManagerComponents(
+      configuration,
+      resourceID,
+      taskManagerHostname,
+      localTaskManagerCommunication,
+      leaderRetrievalServiceOption)
+
+    // create the actor properties (which define the actor constructor parameters)
+    val tmProps = getTaskManagerProps(
+      taskManagerClass,
+      taskManagerConfig,
+      resourceID,
+      connectionInfo,
+      memoryManager,
+      ioManager,
+      network,
+      leaderRetrievalService)
+
+    taskManagerActorName match {
+      case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
+      case None => actorSystem.actorOf(tmProps)
+    }
+  }
+
+  def getTaskManagerProps(
+    taskManagerClass: Class[_ <: TaskManager],
+    taskManagerConfig: TaskManagerConfiguration,
+    resourceID: ResourceID,
+    taskManagerLocation: TaskManagerLocation,
+    memoryManager: MemoryManager,
+    ioManager: IOManager,
+    networkEnvironment: NetworkEnvironment,
+    leaderRetrievalService: LeaderRetrievalService
+  ): Props = {
+    Props(
+      taskManagerClass,
+      taskManagerConfig,
+      resourceID,
+      taskManagerLocation,
+      memoryManager,
+      ioManager,
+      networkEnvironment,
+      taskManagerConfig.numberOfSlots,
+      leaderRetrievalService)
+  }
+
+  def createTaskManagerComponents(
+    configuration: Configuration,
+    resourceID: ResourceID,
+    taskManagerHostname: String,
+    localTaskManagerCommunication: Boolean,
+    leaderRetrievalServiceOption: Option[LeaderRetrievalService]):
+      (TaskManagerConfiguration,
+      TaskManagerLocation,
+      MemoryManager,
+      IOManager,
+      NetworkEnvironment,
+      LeaderRetrievalService) = {
+
+    val (taskManagerConfig : TaskManagerConfiguration,
+    netConfig: NetworkEnvironmentConfiguration,
+    taskManagerAddress: InetSocketAddress,
+    memType: MemoryType
+      ) = parseTaskManagerConfiguration(
       configuration,
       taskManagerHostname,
       localTaskManagerCommunication)
@@ -1895,10 +1975,10 @@ object TaskManager {
     // check if a value has been configured
     val configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)
     checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
-      ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
-      "MemoryManager needs at least one MB of memory. " +
-        "If you leave this config parameter empty, the system automatically " +
-        "pick a fraction of the available memory.")
+                         ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
+                         "MemoryManager needs at least one MB of memory. " +
+                           "If you leave this config parameter empty, the system automatically " +
+                           "pick a fraction of the available memory.")
 
 
     val preAllocateMemory = configuration.getBoolean(
@@ -1910,7 +1990,7 @@ object TaskManager {
         LOG.info(s"Using $configuredMemory MB for managed memory.")
       } else {
         LOG.info(s"Limiting managed memory to $configuredMemory MB, " +
-          s"memory will be allocated lazily.")
+                   s"memory will be allocated lazily.")
       }
       configuredMemory << 20 // megabytes to bytes
     }
@@ -1928,10 +2008,10 @@ object TaskManager {
 
         if (preAllocateMemory) {
           LOG.info(s"Using $fraction of the currently free heap space for managed " +
-            s"heap memory (${relativeMemSize >> 20} MB).")
+                     s"heap memory (${relativeMemSize >> 20} MB).")
         } else {
           LOG.info(s"Limiting managed memory to $fraction of the currently free heap space " +
-            s"(${relativeMemSize >> 20} MB), memory will be allocated lazily.")
+                     s"(${relativeMemSize >> 20} MB), memory will be allocated lazily.")
         }
 
         relativeMemSize
@@ -1944,10 +2024,10 @@ object TaskManager {
 
         if (preAllocateMemory) {
           LOG.info(s"Using $fraction of the maximum memory size for " +
-            s"managed off-heap memory (${directMemorySize >> 20} MB).")
+                     s"managed off-heap memory (${directMemorySize >> 20} MB).")
         } else {
           LOG.info(s"Limiting managed memory to $fraction of the maximum memory size " +
-            s"(${directMemorySize >> 20} MB), memory will be allocated lazily.")
+                     s"(${directMemorySize >> 20} MB), memory will be allocated lazily.")
         }
 
         directMemorySize
@@ -1971,12 +2051,12 @@ object TaskManager {
         memType match {
           case MemoryType.HEAP =>
             throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
-              s" while allocating the TaskManager heap memory ($memorySize bytes).", e)
+                      s" while allocating the TaskManager heap memory ($memorySize bytes).", e)
 
           case MemoryType.OFF_HEAP =>
             throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
-              s" while allocating the TaskManager off-heap memory ($memorySize bytes). " +
-              s"Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e)
+                      s" while allocating the TaskManager off-heap memory ($memorySize bytes). " +
+                      s"Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e)
 
           case _ => throw e
         }
@@ -1990,22 +2070,12 @@ object TaskManager {
       case None => LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
     }
 
-    // create the actor properties (which define the actor constructor parameters)
-    val tmProps = Props(
-      taskManagerClass,
-      taskManagerConfig,
-      resourceID,
+    (taskManagerConfig,
       taskManagerLocation,
       memoryManager,
       ioManager,
       network,
-      taskManagerConfig.numberOfSlots,
       leaderRetrievalService)
-
-    taskManagerActorName match {
-      case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
-      case None => actorSystem.actorOf(tmProps)
-    }
   }
 
 
@@ -2055,8 +2125,8 @@ object TaskManager {
    * @param taskManagerHostname The host name under which the TaskManager communicates.
    * @param localTaskManagerCommunication True, to skip initializing the network stack.
    *                                      Use only in cases where only one task manager runs.
-   * @return A tuple (TaskManagerConfiguration, network configuration,
-   *                  InstanceConnectionInfo, JobManager actor Akka URL).
+   * @return A tuple (TaskManagerConfiguration, network configuration, inet socket address,
+    *         memory tyep).
    */
   @throws(classOf[IllegalArgumentException])
   def parseTaskManagerConfiguration(

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
deleted file mode 100644
index 16331ac..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.ActorRef
-
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
-import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
-import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.metrics.MetricRegistry
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import java.util.concurrent.ExecutorService
-
-/** JobManager implementation extended by testing messages
-  *
-  */
-class TestingJobManager(
-    flinkConfiguration: Configuration,
-    executorService: ExecutorService,
-    instanceManager: InstanceManager,
-    scheduler: Scheduler,
-    libraryCacheManager: BlobLibraryCacheManager,
-    archive: ActorRef,
-    restartStrategyFactory: RestartStrategyFactory,
-    timeout: FiniteDuration,
-    leaderElectionService: LeaderElectionService,
-    submittedJobGraphs : SubmittedJobGraphStore,
-    checkpointRecoveryFactory : CheckpointRecoveryFactory,
-    savepointStore : SavepointStore,
-    jobRecoveryTimeout : FiniteDuration,
-    metricRegistry : Option[MetricRegistry])
-  extends JobManager(
-    flinkConfiguration,
-      executorService,
-    instanceManager,
-    scheduler,
-    libraryCacheManager,
-    archive,
-    restartStrategyFactory,
-    timeout,
-    leaderElectionService,
-    submittedJobGraphs,
-    checkpointRecoveryFactory,
-    savepointStore,
-    jobRecoveryTimeout,
-    metricRegistry)
-  with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
deleted file mode 100644
index 3947b17..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ /dev/null
@@ -1,417 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.{ActorRef, Cancellable, Terminated}
-import akka.pattern.{ask, pipe}
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.FlinkActor
-import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.jobmanager.JobManager
-import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient, RequestClassloadingProps}
-import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
-import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
-import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
-import org.apache.flink.runtime.testingUtils.TestingMessages._
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
-
-import scala.collection.mutable
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-/** This mixin can be used to decorate a JobManager with messages for testing purpose.  */
-trait TestingJobManagerLike extends FlinkActor {
-  that: JobManager =>
-
-  import context._
-
-  import scala.collection.JavaConverters._
-
-  val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
-  val waitForTaskManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
-
-  val waitForAllVerticesToBeRunningOrFinished =
-    scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
-
-  var periodicCheck: Option[Cancellable] = None
-
-  val waitForJobStatus = scala.collection.mutable.HashMap[JobID,
-    collection.mutable.HashMap[JobStatus, Set[ActorRef]]]()
-
-  val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, (Boolean, Set[ActorRef])]()
-
-  val waitForLeader = scala.collection.mutable.HashSet[ActorRef]()
-
-  val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder(
-    new Ordering[(Int, ActorRef)] {
-      override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1
-    })
-
-  val waitForClient = scala.collection.mutable.HashSet[ActorRef]()
-
-  val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
-
-  var disconnectDisabled = false
-
-  var postStopEnabled = true
-
-  abstract override def postStop(): Unit = {
-    if (postStopEnabled) {
-      super.postStop()
-    } else {
-      // only stop leader election service to revoke the leadership of this JM so that a new JM
-      // can be elected leader
-      leaderElectionService.stop()
-    }
-  }
-
-  abstract override def handleMessage: Receive = {
-    handleTestingMessage orElse super.handleMessage
-  }
-
-  def handleTestingMessage: Receive = {
-    case Alive => sender() ! Acknowledge
-
-    case RequestExecutionGraph(jobID) =>
-      currentJobs.get(jobID) match {
-        case Some((executionGraph, jobInfo)) => sender() ! decorateMessage(
-          ExecutionGraphFound(
-            jobID,
-            executionGraph)
-        )
-
-        case None => archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender())
-      }
-
-    case WaitForAllVerticesToBeRunning(jobID) =>
-      if(checkIfAllVerticesRunning(jobID)){
-        sender() ! decorateMessage(AllVerticesRunning(jobID))
-      }else{
-        val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]())
-        waitForAllVerticesToBeRunning += jobID -> (waiting + sender())
-
-        if(periodicCheck.isEmpty){
-          periodicCheck =
-            Some(
-              context.system.scheduler.schedule(
-                0 seconds,
-                200 millis,
-                self,
-                decorateMessage(NotifyListeners)
-              )
-            )
-        }
-      }
-    case WaitForAllVerticesToBeRunningOrFinished(jobID) =>
-      if(checkIfAllVerticesRunningOrFinished(jobID)){
-        sender() ! decorateMessage(AllVerticesRunning(jobID))
-      }else{
-        val waiting = waitForAllVerticesToBeRunningOrFinished.getOrElse(jobID, Set[ActorRef]())
-        waitForAllVerticesToBeRunningOrFinished += jobID -> (waiting + sender())
-
-        if(periodicCheck.isEmpty){
-          periodicCheck =
-            Some(
-              context.system.scheduler.schedule(
-                0 seconds,
-                200 millis,
-                self,
-                decorateMessage(NotifyListeners)
-              )
-            )
-        }
-      }
-
-    case NotifyListeners =>
-      for(jobID <- currentJobs.keySet){
-        notifyListeners(jobID)
-      }
-
-      if(waitForAllVerticesToBeRunning.isEmpty && waitForAllVerticesToBeRunningOrFinished.isEmpty) {
-        periodicCheck foreach { _.cancel() }
-        periodicCheck = None
-      }
-
-
-    case NotifyWhenJobRemoved(jobID) =>
-      val gateways = instanceManager.getAllRegisteredInstances.asScala.map(_.getActorGateway)
-
-      val responses = gateways.map{
-        gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean]
-      }
-
-      val jobRemovedOnJobManager = (self ? CheckIfJobRemoved(jobID))(timeout).mapTo[Boolean]
-
-      val allFutures = responses ++ Seq(jobRemovedOnJobManager)
-
-      import context.dispatcher
-      Future.fold(allFutures)(true)(_ & _) map(decorateMessage(_)) pipeTo sender()
-
-    case CheckIfJobRemoved(jobID) =>
-      if(currentJobs.contains(jobID)) {
-        context.system.scheduler.scheduleOnce(
-          200 milliseconds,
-          self,
-          decorateMessage(CheckIfJobRemoved(jobID))
-        )(context.dispatcher, sender())
-      } else {
-        sender() ! decorateMessage(true)
-      }
-
-    case NotifyWhenTaskManagerTerminated(taskManager) =>
-      val waiting = waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set())
-      waitForTaskManagerToBeTerminated += taskManager.path.name -> (waiting + sender)
-
-    case msg@Terminated(taskManager) =>
-      super.handleMessage(msg)
-
-      waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
-        _ foreach {
-          listener =>
-            listener ! decorateMessage(TaskManagerTerminated(taskManager))
-        }
-      }
-
-    // see shutdown method for reply
-    case NotifyOfComponentShutdown =>
-      waitForShutdown += sender()
-
-    case NotifyWhenAccumulatorChange(jobID) =>
-
-      val (updated, registered) = waitForAccumulatorUpdate.
-        getOrElse(jobID, (false, Set[ActorRef]()))
-      waitForAccumulatorUpdate += jobID -> (updated, registered + sender)
-      sender ! true
-
-    /**
-     * Notification from the task manager that changed accumulator are transferred on next
-     * Hearbeat. We need to keep this state to notify the listeners on next Heartbeat report.
-     */
-    case AccumulatorsChanged(jobID: JobID) =>
-      waitForAccumulatorUpdate.get(jobID) match {
-        case Some((updated, registered)) =>
-          waitForAccumulatorUpdate.put(jobID, (true, registered))
-        case None =>
-      }
-
-    /**
-     * Disabled async processing of accumulator values and send accumulators to the listeners if
-     * we previously received an [[AccumulatorsChanged]] message.
-     */
-    case msg : Heartbeat =>
-      super.handleMessage(msg)
-
-      waitForAccumulatorUpdate foreach {
-        case (jobID, (updated, actors)) if updated =>
-          currentJobs.get(jobID) match {
-            case Some((graph, jobInfo)) =>
-              val flinkAccumulators = graph.getFlinkAccumulators
-              val userAccumulators = graph.aggregateUserAccumulators
-              actors foreach {
-                actor => actor ! UpdatedAccumulators(jobID, flinkAccumulators, userAccumulators)
-              }
-            case None =>
-          }
-          waitForAccumulatorUpdate.put(jobID, (false, actors))
-        case _ =>
-      }
-
-    case RequestWorkingTaskManager(jobID) =>
-      currentJobs.get(jobID) match {
-        case Some((eg, _)) =>
-          if(eg.getAllExecutionVertices.asScala.isEmpty){
-            sender ! decorateMessage(WorkingTaskManager(None))
-          } else {
-            val resource = eg.getAllExecutionVertices.asScala.head.getCurrentAssignedResource
-
-            if(resource == null){
-              sender ! decorateMessage(WorkingTaskManager(None))
-            } else {
-              sender ! decorateMessage(
-                WorkingTaskManager(
-                  Some(resource.getTaskManagerActorGateway())
-                )
-              )
-            }
-          }
-        case None => sender ! decorateMessage(WorkingTaskManager(None))
-      }
-
-    case NotifyWhenJobStatus(jobID, state) =>
-      val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID,
-        scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]())
-
-      val listener = jobStatusListener.getOrElse(state, Set[ActorRef]())
-
-      jobStatusListener += state -> (listener + sender)
-
-    case msg@JobStatusChanged(jobID, newJobStatus, _, _) =>
-      super.handleMessage(msg)
-
-      val cleanup = waitForJobStatus.get(jobID) match {
-        case Some(stateListener) =>
-          stateListener.remove(newJobStatus) match {
-            case Some(listeners) =>
-              listeners foreach {
-                _ ! decorateMessage(JobStatusIs(jobID, newJobStatus))
-              }
-            case _ =>
-          }
-          stateListener.isEmpty
-
-        case _ => false
-      }
-
-      if (cleanup) {
-        waitForJobStatus.remove(jobID)
-      }
-
-    case DisableDisconnect =>
-      disconnectDisabled = true
-
-    case DisablePostStop =>
-      postStopEnabled = false
-
-    case RequestSavepoint(savepointPath) =>
-      try {
-        val savepoint = savepointStore.loadSavepoint(savepointPath)
-        sender ! ResponseSavepoint(savepoint)
-      }
-      catch {
-        case e: Exception =>
-          sender ! ResponseSavepoint(null)
-      }
-
-    case msg: Disconnect =>
-      if (!disconnectDisabled) {
-        super.handleMessage(msg)
-
-        val taskManager = sender()
-
-        waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
-          _ foreach {
-            listener =>
-              listener ! decorateMessage(TaskManagerTerminated(taskManager))
-          }
-        }
-      }
-
-    case NotifyWhenLeader =>
-      if (leaderElectionService.hasLeadership) {
-        sender() ! true
-      } else {
-        waitForLeader += sender()
-      }
-
-    case msg: GrantLeadership =>
-      super.handleMessage(msg)
-
-      waitForLeader.foreach(_ ! true)
-
-      waitForLeader.clear()
-
-    case NotifyWhenClientConnects =>
-      waitForClient += sender()
-      sender() ! true
-
-    case msg: RegisterJobClient =>
-      super.handleMessage(msg)
-      waitForClient.foreach(_ ! ClientConnected)
-    case msg: RequestClassloadingProps =>
-      super.handleMessage(msg)
-      waitForClient.foreach(_ ! ClassLoadingPropsDelivered)
-
-    case NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) =>
-      if (that.instanceManager.getNumberOfRegisteredTaskManagers >= numRegisteredTaskManager) {
-        // there are already at least numRegisteredTaskManager registered --> send Acknowledge
-        sender() ! Acknowledge
-      } else {
-        // wait until we see at least numRegisteredTaskManager being registered at the JobManager
-        waitForNumRegisteredTaskManagers += ((numRegisteredTaskManager, sender()))
-      }
-
-    // TaskManager may be registered on these two messages
-    case msg @ (_: RegisterTaskManager) =>
-      super.handleMessage(msg)
-
-      // dequeue all senders which wait for instanceManager.getNumberOfStartedTaskManagers or
-      // fewer registered TaskManagers
-      while (waitForNumRegisteredTaskManagers.nonEmpty &&
-        waitForNumRegisteredTaskManagers.head._1 <=
-          instanceManager.getNumberOfRegisteredTaskManagers) {
-        val receiver = waitForNumRegisteredTaskManagers.dequeue()._2
-        receiver ! Acknowledge
-      }
-  }
-
-  def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
-    currentJobs.get(jobID) match {
-      case Some((eg, _)) =>
-        eg.getAllExecutionVertices.asScala.forall( _.getExecutionState == ExecutionState.RUNNING)
-      case None => false
-    }
-  }
-
-  def checkIfAllVerticesRunningOrFinished(jobID: JobID): Boolean = {
-    currentJobs.get(jobID) match {
-      case Some((eg, _)) =>
-        eg.getAllExecutionVertices.asScala.forall {
-          case vertex =>
-            (vertex.getExecutionState == ExecutionState.RUNNING
-              || vertex.getExecutionState == ExecutionState.FINISHED)
-        }
-      case None => false
-    }
-  }
-
-  def notifyListeners(jobID: JobID): Unit = {
-    if(checkIfAllVerticesRunning(jobID)) {
-      waitForAllVerticesToBeRunning.remove(jobID) match {
-        case Some(listeners) =>
-          for (listener <- listeners) {
-            listener ! decorateMessage(AllVerticesRunning(jobID))
-          }
-        case _ =>
-      }
-    }
-
-    if(checkIfAllVerticesRunningOrFinished(jobID)) {
-      waitForAllVerticesToBeRunningOrFinished.remove(jobID) match {
-        case Some(listeners) =>
-          for (listener <- listeners) {
-            listener ! decorateMessage(AllVerticesRunning(jobID))
-          }
-        case _ =>
-      }
-    }
-  }
-
-  /**
-    * No killing of the VM for testing.
-    */
-  override protected def shutdown(): Unit = {
-    log.info("Shutting down TestingJobManager.")
-    waitForShutdown.foreach(_ ! ComponentShutdown(self))
-    waitForShutdown.clear()
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
deleted file mode 100644
index f121305..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import java.util.Map
-
-import akka.actor.ActorRef
-import org.apache.flink.api.common.JobID
-import org.apache.flink.api.common.accumulators.Accumulator
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint
-import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
-import org.apache.flink.runtime.instance.ActorGateway
-import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
-
-object TestingJobManagerMessages {
-
-  case class RequestExecutionGraph(jobID: JobID)
-
-  sealed trait ResponseExecutionGraph {
-    def jobID: JobID
-  }
-
-  case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph) extends
-  ResponseExecutionGraph
-
-  case class ExecutionGraphNotFound(jobID: JobID) extends ResponseExecutionGraph
-
-  case class WaitForAllVerticesToBeRunning(jobID: JobID)
-  case class WaitForAllVerticesToBeRunningOrFinished(jobID: JobID)
-  case class AllVerticesRunning(jobID: JobID)
-
-  case class NotifyWhenJobRemoved(jobID: JobID)
-
-  case class RequestWorkingTaskManager(jobID: JobID)
-  case class WorkingTaskManager(gatewayOption: Option[ActorGateway])
-
-  case class NotifyWhenJobStatus(jobID: JobID, state: JobStatus)
-  case class JobStatusIs(jobID: JobID, state: JobStatus)
-
-  case object NotifyListeners
-
-  case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
-  case class TaskManagerTerminated(taskManager: ActorRef)
-
-  /**
-   * Registers a listener to receive a message when accumulators changed.
-   * The change must be explicitly triggered by the TestingTaskManager which can receive an
-   * [[org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged]]
-   * message by a task that changed the accumulators. This message is then
-   * forwarded to the JobManager which will send the accumulators in the [[UpdatedAccumulators]]
-   * message when the next Heartbeat occurs.
-   */
-  case class NotifyWhenAccumulatorChange(jobID: JobID)
-
-  /**
-   * Reports updated accumulators back to the listener.
-   */
-  case class UpdatedAccumulators(jobID: JobID,
-    flinkAccumulators: Map[ExecutionAttemptID, Map[AccumulatorRegistry.Metric, Accumulator[_,_]]],
-    userAccumulators: Map[String, Accumulator[_,_]])
-
-  /** Notifies the sender when the [[TestingJobManager]] has been elected as the leader
-   *
-   */
-  case object NotifyWhenLeader
-
-  /**
-    * Notifies the sender when the [[TestingJobManager]] receives new clients for jobs
-    */
-  case object NotifyWhenClientConnects
-  /**
-    * Notifes of client connect
-    */
-  case object ClientConnected
-  /**
-    * Notifies when the client has requested class loading information
-    */
-  case object ClassLoadingPropsDelivered
-
-  /**
-   * Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]]
-   * message when at least numRegisteredTaskManager have registered at the JobManager.
-   *
-   * @param numRegisteredTaskManager minimum number of registered TMs before the sender is notified
-   */
-  case class NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager: Int)
-
-  /** Disables the post stop method of the [[TestingJobManager]].
-    *
-    * Only the leaderElectionService is stopped in the postStop method call to revoke the leadership
-    */
-  case object DisablePostStop
-
-  /**
-    * Requests a savepoint from the job manager.
-    *
-    * @param savepointPath The path of the savepoint to request.
-    */
-  case class RequestSavepoint(savepointPath: String)
-
-  /**
-    * Response to a savepoint request.
-    *
-    * @param savepoint The requested savepoint or null if none available.
-    */
-  case class ResponseSavepoint(savepoint: Savepoint)
-
-  def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader
-  def getNotifyWhenClientConnects(): AnyRef = NotifyWhenClientConnects
-  def getDisablePostStop(): AnyRef = DisablePostStop
-
-  def getClientConnected(): AnyRef = ClientConnected
-  def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
deleted file mode 100644
index 48a1ddd..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import org.apache.flink.runtime.jobmanager.MemoryArchivist
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphFound, ExecutionGraphNotFound, RequestExecutionGraph}
-
-/** Memory archivist extended by testing messages
-  *
-  * @param maxEntries number of maximum number of archived jobs
-  */
-class TestingMemoryArchivist(maxEntries: Int) extends MemoryArchivist(maxEntries) {
-
-  override def handleMessage: Receive = {
-    handleTestingMessage orElse super.handleMessage
-  }
-
-  def handleTestingMessage: Receive = {
-    case RequestExecutionGraph(jobID) =>
-      val executionGraph = graphs.get(jobID)
-      
-      executionGraph match {
-        case Some(graph) => sender ! decorateMessage(ExecutionGraphFound(jobID, graph))
-        case None => sender ! decorateMessage(ExecutionGraphNotFound(jobID))
-      }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
deleted file mode 100644
index 91d169a..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.ActorRef
-import org.apache.flink.api.common.JobID
-
-object TestingMessages {
-
-  case class CheckIfJobRemoved(jobID: JobID)
-
-  case object DisableDisconnect
-
-  case object Alive
-
-  def getAlive: AnyRef = Alive
-
-  def getDisableDisconnect: AnyRef = DisableDisconnect
-
-  case object NotifyOfComponentShutdown
-  case class ComponentShutdown(ref: ActorRef)
-
-  def getNotifyOfComponentShutdown(): AnyRef = NotifyOfComponentShutdown
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
deleted file mode 100644
index 9b5a147..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.io.disk.iomanager.IOManager
-import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
-import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManagerLocation, TaskManager, TaskManagerConfiguration}
-
-import scala.language.postfixOps
-
-/** Subclass of the [[TaskManager]] to support testing messages
- */
-class TestingTaskManager(
-                          config: TaskManagerConfiguration,
-                          resourceID: ResourceID,
-                          connectionInfo: TaskManagerLocation,
-                          memoryManager: MemoryManager,
-                          ioManager: IOManager,
-                          network: NetworkEnvironment,
-                          numberOfSlots: Int,
-                          leaderRetrievalService: LeaderRetrievalService)
-  extends TaskManager(
-    config,
-    resourceID,
-    connectionInfo,
-    memoryManager,
-    ioManager,
-    network,
-    numberOfSlots,
-    leaderRetrievalService)
-  with TestingTaskManagerLike {
-
-  def this(
-            config: TaskManagerConfiguration,
-            connectionInfo: TaskManagerLocation,
-            memoryManager: MemoryManager,
-            ioManager: IOManager,
-            network: NetworkEnvironment,
-            numberOfSlots: Int,
-            leaderRetrievalService: LeaderRetrievalService) {
-    this(
-      config,
-      ResourceID.generate(),
-      connectionInfo,
-      memoryManager,
-      ioManager,
-      network,
-      numberOfSlots,
-      leaderRetrievalService)
-  }
-}