You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/08 15:28:38 UTC

[16/50] [abbrv] flink git commit: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFlinkMiniCluster

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
deleted file mode 100644
index a6963fe..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.{ActorRef, Terminated}
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.FlinkActor
-import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.messages.JobManagerMessages.{RequestLeaderSessionID, ResponseLeaderSessionID}
-import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
-import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered}
-import org.apache.flink.runtime.messages.TaskMessages.{SubmitTask, TaskInFinalState, UpdateTaskExecutionState}
-import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
-import org.apache.flink.runtime.testingUtils.TestingMessages._
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-/** This mixin can be used to decorate a TaskManager with messages for testing purposes. */
-trait TestingTaskManagerLike extends FlinkActor {
-  that: TaskManager =>
-
-  import scala.collection.JavaConverters._
-
-  val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
-  val waitForJobManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
-  val waitForRegisteredAtResourceManager =
-    scala.collection.mutable.HashMap[ActorRef, Set[ActorRef]]()
-  val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
-  val unregisteredTasks = scala.collection.mutable.HashSet[ExecutionAttemptID]()
-
-  /** Map of registered task submit listeners */
-  val registeredSubmitTaskListeners = scala.collection.mutable.HashMap[JobID, ActorRef]()
-
-  val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
-
-  var disconnectDisabled = false
-
-  /**
-   * Handler for testing related messages
-   */
-  abstract override def handleMessage: Receive = {
-    handleTestingMessage orElse super.handleMessage
-  }
-
-  def handleTestingMessage: Receive = {
-    case Alive => sender() ! Acknowledge
-
-    case NotifyWhenTaskIsRunning(executionID) =>
-      Option(runningTasks.get(executionID)) match {
-        case Some(task) if task.getExecutionState == ExecutionState.RUNNING =>
-          sender ! decorateMessage(true)
-
-        case _ =>
-          val listeners = waitForRunning.getOrElse(executionID, Set())
-          waitForRunning += (executionID -> (listeners + sender))
-      }
-
-    case RequestRunningTasks =>
-      sender ! decorateMessage(ResponseRunningTasks(runningTasks.asScala.toMap))
-
-    case NotifyWhenTaskRemoved(executionID) =>
-      Option(runningTasks.get(executionID)) match {
-        case Some(_) =>
-          val set = waitForRemoval.getOrElse(executionID, Set())
-          waitForRemoval += (executionID -> (set + sender))
-        case None =>
-          if(unregisteredTasks.contains(executionID)) {
-            sender ! decorateMessage(true)
-          } else {
-            val set = waitForRemoval.getOrElse(executionID, Set())
-            waitForRemoval += (executionID -> (set + sender))
-          }
-      }
-
-    case TaskInFinalState(executionID) =>
-      super.handleMessage(TaskInFinalState(executionID))
-      waitForRemoval.remove(executionID) match {
-        case Some(actors) => for(actor <- actors) actor ! decorateMessage(true)
-        case None =>
-      }
-
-      unregisteredTasks += executionID
-
-    case RequestBroadcastVariablesWithReferences =>
-      sender ! decorateMessage(
-        ResponseBroadcastVariablesWithReferences(
-          bcVarManager.getNumberOfVariablesWithReferences)
-      )
-
-    case RequestNumActiveConnections =>
-      val numActive = if (!network.isShutdown) {
-        network.getConnectionManager.getNumberOfActiveConnections
-      } else {
-        0
-      }
-      sender ! decorateMessage(ResponseNumActiveConnections(numActive))
-
-    case NotifyWhenJobRemoved(jobID) =>
-      if(runningTasks.values.asScala.exists(_.getJobID == jobID)){
-        context.system.scheduler.scheduleOnce(
-          200 milliseconds,
-          self,
-          decorateMessage(CheckIfJobRemoved(jobID)))(
-            context.dispatcher,
-            sender()
-          )
-      }else{
-        sender ! decorateMessage(true)
-      }
-
-    case CheckIfJobRemoved(jobID) =>
-      if(runningTasks.values.asScala.forall(_.getJobID != jobID)){
-        sender ! decorateMessage(true)
-      } else {
-        context.system.scheduler.scheduleOnce(
-          200 milliseconds,
-          self,
-          decorateMessage(CheckIfJobRemoved(jobID)))(
-            context.dispatcher,
-            sender()
-          )
-      }
-
-    case NotifyWhenJobManagerTerminated(jobManager) =>
-      val waiting = waitForJobManagerToBeTerminated.getOrElse(jobManager.path.name, Set())
-      waitForJobManagerToBeTerminated += jobManager.path.name -> (waiting + sender)
-
-    case RegisterSubmitTaskListener(jobId) =>
-      registeredSubmitTaskListeners.put(jobId, sender())
-
-    case msg@SubmitTask(tdd) =>
-      registeredSubmitTaskListeners.get(tdd.getJobID) match {
-        case Some(listenerRef) =>
-          listenerRef ! ResponseSubmitTaskListener(tdd)
-        case None =>
-        // Nothing to do
-      }
-
-      super.handleMessage(msg)
-
-    /**
-     * Message from task manager that accumulator values changed and need to be reported immediately
-     * instead of lazily through the
-     * [[org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat]] message. We forward this
-     * message to the job manager that it knows it should report to the listeners.
-     */
-    case msg: AccumulatorsChanged =>
-      currentJobManager match {
-        case Some(jobManager) =>
-          jobManager.forward(msg)
-          sendHeartbeatToJobManager()
-          sender ! true
-        case None =>
-      }
-
-    case msg@Terminated(jobManager) =>
-      super.handleMessage(msg)
-
-      waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
-        _ foreach {
-          _ ! decorateMessage(JobManagerTerminated(jobManager))
-        }
-      }
-
-    case msg:Disconnect =>
-      if (!disconnectDisabled) {
-        super.handleMessage(msg)
-
-        val jobManager = sender()
-
-        waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
-          _ foreach {
-            _ ! decorateMessage(JobManagerTerminated(jobManager))
-          }
-        }
-      }
-
-    case DisableDisconnect =>
-      disconnectDisabled = true
-
-    case NotifyOfComponentShutdown =>
-      waitForShutdown += sender()
-
-    case msg @ UpdateTaskExecutionState(taskExecutionState) =>
-      super.handleMessage(msg)
-
-      if(taskExecutionState.getExecutionState == ExecutionState.RUNNING) {
-        waitForRunning.get(taskExecutionState.getID) foreach {
-          _ foreach (_ ! decorateMessage(true))
-        }
-      }
-
-    case RequestLeaderSessionID =>
-      sender() ! ResponseLeaderSessionID(leaderSessionID.orNull)
-
-    case NotifyWhenRegisteredAtJobManager(jobManager: ActorRef) =>
-      if(isConnected && jobManager == currentJobManager.get) {
-        sender() ! true
-      } else {
-        val list = waitForRegisteredAtResourceManager.getOrElse(
-          jobManager,
-          Set[ActorRef]())
-
-        waitForRegisteredAtResourceManager += jobManager -> (list + sender())
-      }
-
-    case msg @ (_: AcknowledgeRegistration | _: AlreadyRegistered) =>
-      super.handleMessage(msg)
-
-      val jm = sender()
-
-      waitForRegisteredAtResourceManager.remove(jm).foreach {
-        listeners => listeners.foreach{
-          listener =>
-            listener ! true
-        }
-      }
-  }
-
-  /**
-    * No killing of the VM for testing.
-    */
-  override protected def shutdown(): Unit = {
-    log.info("Shutting down TestingJobManager.")
-    waitForShutdown.foreach(_ ! ComponentShutdown(self))
-    waitForShutdown.clear()
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
deleted file mode 100644
index 974e4e8..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.ActorRef
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.taskmanager.Task
-
-/**
- * Additional messages that the [[TestingTaskManager]] understands.
- */
-object TestingTaskManagerMessages {
-  
-  case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID)
-
-  case class NotifyWhenTaskIsRunning(executionID: ExecutionAttemptID)
-  
-  case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){
-    import collection.JavaConverters._
-    def asJava: java.util.Map[ExecutionAttemptID, Task] = tasks.asJava
-  }
-  
-  case class ResponseBroadcastVariablesWithReferences(number: Int)
-
-  case object RequestNumActiveConnections
-  case class ResponseNumActiveConnections(number: Int)
-  
-  case object RequestRunningTasks
-  
-  case object RequestBroadcastVariablesWithReferences
-
-  case class NotifyWhenJobManagerTerminated(jobManager: ActorRef)
-
-  case class JobManagerTerminated(jobManager: ActorRef)
-
-  case class NotifyWhenRegisteredAtJobManager(resourceManager: ActorRef)
-
-  /**
-   * Message to give a hint to the task manager that accumulator values were updated in the task.
-   * This message is forwarded to the job manager which knows that it needs to notify listeners
-   * of accumulator updates.
-   */
-  case class AccumulatorsChanged(jobID: JobID)
-
-  /**
-    * Registers a listener for all [[org.apache.flink.runtime.messages.TaskMessages.SubmitTask]]
-    * messages of the given job.
-    *
-    * If a task is submitted with the given job ID the task deployment
-    * descriptor is forwarded to the listener.
-    *
-    * @param jobId The job ID to listen for.
-    */
-  case class RegisterSubmitTaskListener(jobId: JobID)
-
-  /**
-    * A response to a listened job ID containing the submitted task deployment descriptor.
-    *
-    * @param tdd The submitted task deployment descriptor.
-    */
-  case class ResponseSubmitTaskListener(tdd: TaskDeploymentDescriptor)
-
-  // --------------------------------------------------------------------------
-  // Utility methods to allow simpler case object access from Java
-  // --------------------------------------------------------------------------
-  
-  def getRequestRunningTasksMessage: AnyRef = {
-    RequestRunningTasks
-  }
-  
-  def getRequestBroadcastVariablesWithReferencesMessage: AnyRef = {
-    RequestBroadcastVariablesWithReferences
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
index e596166..c143fe2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import scala.Option;
@@ -86,7 +85,7 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
 
 	@Override
 	public int getNumberOfJobManagers() {
-		return this.configuration().getInteger(
+		return this.originalConfiguration().getInteger(
 				ConfigConstants.LOCAL_NUMBER_JOB_MANAGER,
 				ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
new file mode 100644
index 0000000..495cacd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import akka.actor.ActorRef;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * A testing resource manager which may alter the default standalone resource master's behavior.
+ */
+public class TestingResourceManager extends StandaloneResourceManager {
+
+	/** Set of Actors which want to be informed of a connection to the job manager */
+	private Set<ActorRef> waitForResourceManagerConnected = new HashSet<>();
+
+	/** Set of Actors which want to be informed of a shutdown */
+	private Set<ActorRef> waitForShutdown = new HashSet<>();
+
+	/** Flag to signal a connection to the JobManager */
+	private boolean isConnected = false;
+
+	public TestingResourceManager(Configuration flinkConfig, LeaderRetrievalService leaderRetriever) {
+		super(flinkConfig, leaderRetriever);
+	}
+
+	/**
+	 * Overwrite messages here if desired
+	 */
+	@Override
+	protected void handleMessage(Object message) {
+
+		if (message instanceof GetRegisteredResources) {
+			sender().tell(new GetRegisteredResourcesReply(getStartedTaskManagers()), self());
+		} else if (message instanceof FailResource) {
+			ResourceID resourceID = ((FailResource) message).resourceID;
+			notifyWorkerFailed(resourceID, "Failed for test case.");
+
+		} else if (message instanceof NotifyWhenResourceManagerConnected) {
+			if (isConnected) {
+				sender().tell(
+					Messages.getAcknowledge(),
+					self());
+			} else {
+				waitForResourceManagerConnected.add(sender());
+			}
+		} else if (message instanceof RegisterResourceManagerSuccessful) {
+			super.handleMessage(message);
+
+			isConnected = true;
+
+			for (ActorRef ref : waitForResourceManagerConnected) {
+				ref.tell(
+					Messages.getAcknowledge(),
+					self());
+			}
+			waitForResourceManagerConnected.clear();
+
+		} else if (message instanceof TestingMessages.NotifyOfComponentShutdown$) {
+			waitForShutdown.add(sender());
+		} else if (message instanceof TestingMessages.Alive$) {
+			sender().tell(Messages.getAcknowledge(), self());
+		} else {
+			super.handleMessage(message);
+		}
+	}
+
+	/**
+	 * Testing messages
+	 */
+	public static class GetRegisteredResources {}
+
+	public static class GetRegisteredResourcesReply {
+
+		public Collection<ResourceID> resources;
+
+		public GetRegisteredResourcesReply(Collection<ResourceID> resources) {
+			this.resources = resources;
+		}
+
+	}
+
+	/**
+	 * Fails all resources that the resource manager has registered
+	 */
+	public static class FailResource {
+
+		public ResourceID resourceID;
+
+		public FailResource(ResourceID resourceID) {
+			this.resourceID = resourceID;
+		}
+	}
+
+	/**
+	 * The sender of this message will be informed of a connection to the Job Manager
+	 */
+	public static class NotifyWhenResourceManagerConnected {}
+
+	/**
+	 * Inform registered listeners about a shutdown of the application.
+     */
+	@Override
+	protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+		for (ActorRef listener : waitForShutdown) {
+			listener.tell(new TestingMessages.ComponentShutdown(self()), self());
+		}
+		waitForShutdown.clear();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index b4ba40b..c01a321 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -18,22 +18,32 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import java.util.concurrent.TimeoutException
+import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
 
 import akka.pattern.ask
-import akka.actor.{ActorRef, Props, ActorSystem}
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.pattern.Patterns._
 import akka.testkit.CallingThreadDispatcher
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
-import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
+import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, SubmittedJobGraphStore}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster
+import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.testutils.TestingResourceManager
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
 
+import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{Await, Future}
 
 /**
@@ -48,7 +58,7 @@ class TestingCluster(
     userConfiguration: Configuration,
     singleActorSystem: Boolean,
     synchronousDispatcher: Boolean)
-  extends FlinkMiniCluster(
+  extends LocalFlinkMiniCluster(
     userConfiguration,
     singleActorSystem) {
 
@@ -59,133 +69,54 @@ class TestingCluster(
 
   // --------------------------------------------------------------------------
 
-  override def generateConfiguration(userConfig: Configuration): Configuration = {
-    val cfg = new Configuration()
-    cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")
-    cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
-    cfg.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, 0)
-    cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10)
-    cfg.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1)
-
-    setDefaultCiConfig(cfg)
-
-    cfg.addAll(userConfig)
-    cfg
-  }
-
-  override def startJobManager(index: Int, actorSystem: ActorSystem): ActorRef = {
-    val config = configuration.clone()
-
-    val jobManagerName = if(singleActorSystem) {
-      JobManager.JOB_MANAGER_NAME + "_" + (index + 1)
-    } else {
-      JobManager.JOB_MANAGER_NAME
-    }
-
-    val archiveName = if(singleActorSystem) {
-      JobManager.ARCHIVE_NAME + "_" + (index + 1)
-    } else {
-      JobManager.ARCHIVE_NAME
-    }
-
-    val jobManagerPort = config.getInteger(
-      ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
-
-    if(jobManagerPort > 0) {
-      config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
-    }
-
-    val (executionContext,
-    instanceManager,
-    scheduler,
-    libraryCacheManager,
-    restartStrategyFactory,
-    timeout,
-    archiveCount,
-    leaderElectionService,
-    submittedJobsGraphs,
-    checkpointRecoveryFactory,
-    savepointStore,
-    jobRecoveryTimeout,
-    metricRegistry) = JobManager.createJobManagerComponents(
-      config,
-      createLeaderElectionService())
-
-    val testArchiveProps = Props(new TestingMemoryArchivist(archiveCount))
-    val archive = actorSystem.actorOf(testArchiveProps, archiveName)
-
-    val jobManagerProps = Props(
-      new TestingJobManager(
-        configuration,
-        executionContext,
-        instanceManager,
-        scheduler,
-        libraryCacheManager,
-        archive,
-        restartStrategyFactory,
-        timeout,
-        leaderElectionService,
-        submittedJobsGraphs,
-        checkpointRecoveryFactory,
-        savepointStore,
-        jobRecoveryTimeout,
-        metricRegistry))
-
-    val dispatcherJobManagerProps = if (synchronousDispatcher) {
-      // disable asynchronous futures (e.g. accumulator update in Heartbeat)
-      jobManagerProps.withDispatcher(CallingThreadDispatcher.Id)
-    } else {
-      jobManagerProps
-    }
-
-    actorSystem.actorOf(dispatcherJobManagerProps, jobManagerName)
-  }
-
-  override def startResourceManager(index: Int, system: ActorSystem): ActorRef = {
-    val config = configuration.clone()
-
-    val resourceManagerName = if(singleActorSystem) {
-      FlinkResourceManager.RESOURCE_MANAGER_NAME + "_" + (index + 1)
+  override val jobManagerClass: Class[_ <: JobManager] = classOf[TestingJobManager]
+
+  override val resourceManagerClass: Class[_ <: FlinkResourceManager[_ <: ResourceIDRetrievable]] =
+    classOf[TestingResourceManager]
+
+  override val taskManagerClass: Class[_ <: TaskManager] = classOf[TestingTaskManager]
+
+  override val memoryArchivistClass: Class[_ <: MemoryArchivist] = classOf[TestingMemoryArchivist]
+
+  override def getJobManagerProps(
+    jobManagerClass: Class[_ <: JobManager],
+    configuration: Configuration,
+    executorService: ExecutorService,
+    instanceManager: InstanceManager,
+    scheduler: Scheduler,
+    libraryCacheManager: BlobLibraryCacheManager,
+    archive: ActorRef,
+    restartStrategyFactory: RestartStrategyFactory,
+    timeout: FiniteDuration,
+    leaderElectionService: LeaderElectionService,
+    submittedJobGraphStore: SubmittedJobGraphStore,
+    checkpointRecoveryFactory: CheckpointRecoveryFactory,
+    savepointStore: SavepointStore,
+    jobRecoveryTimeout: FiniteDuration,
+    metricsRegistry: Option[MetricRegistry]): Props = {
+
+    val props = super.getJobManagerProps(
+      jobManagerClass,
+      configuration,
+      executorService,
+      instanceManager,
+      scheduler,
+      libraryCacheManager,
+      archive,
+      restartStrategyFactory,
+      timeout,
+      leaderElectionService,
+      submittedJobGraphStore,
+      checkpointRecoveryFactory,
+      savepointStore,
+      jobRecoveryTimeout,
+      metricsRegistry)
+
+    if (synchronousDispatcher) {
+      props.withDispatcher(CallingThreadDispatcher.Id)
     } else {
-      FlinkResourceManager.RESOURCE_MANAGER_NAME
+      props
     }
-
-    val resourceManagerPort = config.getInteger(
-      ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
-
-    if(resourceManagerPort > 0) {
-      config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index)
-    }
-
-    val testResourceManagerProps = Props(
-      new TestingResourceManager(
-        config,
-        createLeaderRetrievalService()
-      ))
-
-    system.actorOf(testResourceManagerProps, resourceManagerName)
-  }
-
-  override def startTaskManager(index: Int, system: ActorSystem) = {
-
-    val tmActorName = TaskManager.TASK_MANAGER_NAME + "_" + (index + 1)
-
-    TaskManager.startTaskManagerComponentsAndActor(
-      configuration,
-      ResourceID.generate(),
-      system,
-      hostname,
-      Some(tmActorName),
-      Some(createLeaderRetrievalService()),
-      numTaskManagers == 1,
-      classOf[TestingTaskManager])
-  }
-
-
-  def createLeaderElectionService(): Option[LeaderElectionService] = {
-    None
   }
 
   @throws(classOf[TimeoutException])
@@ -228,4 +159,131 @@ class TestingCluster(
 
     Await.ready(combinedFuture, timeout)
   }
+
+  def waitForTaskManagersToBeRegisteredAtJobManager(jobManager: ActorRef): Unit = {
+    val futures = taskManagerActors.map {
+      _.map {
+              tm => (tm ? NotifyWhenRegisteredAtJobManager(jobManager))(timeout)
+            }
+    }.getOrElse(Seq())
+
+    try {
+      Await.ready(Future.sequence(futures), timeout)
+    } catch {
+      case t: TimeoutException =>
+        throw new Exception("Timeout while waiting for TaskManagers to register at " +
+                              s"${jobManager.path}")
+    }
+
+  }
+
+  def restartLeadingJobManager(): Unit = {
+    this.synchronized {
+      (jobManagerActorSystems, jobManagerActors) match {
+        case (Some(jmActorSystems), Some(jmActors)) =>
+          val leader = getLeaderGateway(AkkaUtils.getTimeout(originalConfiguration))
+          val index = getLeaderIndex(AkkaUtils.getTimeout(originalConfiguration))
+
+          // restart the leading job manager with the same port
+          val port = getLeaderRPCPort
+          val oldPort = originalConfiguration.getInteger(
+            ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+            0)
+
+          // we have to set the old port in the configuration file because this is used for startup
+          originalConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port)
+
+          clearLeader()
+
+          val stopped = gracefulStop(leader.actor(), TestingCluster.MAX_RESTART_DURATION)
+          Await.result(stopped, TestingCluster.MAX_RESTART_DURATION)
+
+          if(!singleActorSystem) {
+            jmActorSystems(index).shutdown()
+            jmActorSystems(index).awaitTermination()
+          }
+
+          val newJobManagerActorSystem = if(!singleActorSystem) {
+            startJobManagerActorSystem(index)
+          } else {
+            jmActorSystems.head
+          }
+
+          // reset the original configuration
+          originalConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, oldPort)
+
+          val newJobManagerActor = startJobManager(index, newJobManagerActorSystem)
+
+          jobManagerActors = Some(jmActors.patch(index, Seq(newJobManagerActor), 1))
+          jobManagerActorSystems = Some(jmActorSystems.patch(
+            index,
+            Seq(newJobManagerActorSystem),
+            1))
+
+          val lrs = createLeaderRetrievalService()
+
+          jobManagerLeaderRetrievalService = Some(lrs)
+          lrs.start(this)
+
+        case _ => throw new Exception("The JobManager of the TestingCluster have not " +
+                                        "been started properly.")
+      }
+    }
+  }
+
+  def restartTaskManager(index: Int): Unit = {
+    (taskManagerActorSystems, taskManagerActors) match {
+      case (Some(tmActorSystems), Some(tmActors)) =>
+        val stopped = gracefulStop(tmActors(index), TestingCluster.MAX_RESTART_DURATION)
+        Await.result(stopped, TestingCluster.MAX_RESTART_DURATION)
+
+        if(!singleActorSystem) {
+          tmActorSystems(index).shutdown()
+          tmActorSystems(index).awaitTermination()
+        }
+
+        val taskManagerActorSystem  = if(!singleActorSystem) {
+          startTaskManagerActorSystem(index)
+        } else {
+          tmActorSystems.head
+        }
+
+        val taskManagerActor = startTaskManager(index, taskManagerActorSystem)
+
+        taskManagerActors = Some(tmActors.patch(index, Seq(taskManagerActor), 1))
+        taskManagerActorSystems = Some(tmActorSystems.patch(index, Seq(taskManagerActorSystem), 1))
+
+      case _ => throw new Exception("The TaskManager of the TestingCluster have not " +
+                                      "been started properly.")
+    }
+  }
+
+  def addTaskManager(): Unit = {
+    if (useSingleActorSystem) {
+      (jobManagerActorSystems, taskManagerActors) match {
+        case (Some(jmSystems), Some(tmActors)) =>
+          val index = numTaskManagers
+          taskManagerActors = Some(tmActors :+ startTaskManager(index, jmSystems(0)))
+          numTaskManagers += 1
+        case _ => throw new IllegalStateException("Cluster has not been started properly.")
+      }
+    } else {
+      (taskManagerActorSystems, taskManagerActors) match {
+        case (Some(tmSystems), Some(tmActors)) =>
+          val index = numTaskManagers
+          val newTmSystem = startTaskManagerActorSystem(index)
+          val newTmActor = startTaskManager(index, newTmSystem)
+
+          taskManagerActorSystems = Some(tmSystems :+ newTmSystem)
+          taskManagerActors = Some(tmActors :+ newTmActor)
+
+          numTaskManagers += 1
+        case _ => throw new IllegalStateException("Cluster has not been started properly.")
+      }
+    }
+  }
+}
+
+object TestingCluster {
+  val MAX_RESTART_DURATION = new FiniteDuration(2, TimeUnit.MINUTES)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
new file mode 100644
index 0000000..62349db
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import java.util.concurrent.ExecutorService
+
+import akka.actor.ActorRef
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
+import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.metrics.MetricRegistry
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+/** JobManager implementation extended by testing messages
+  *
+  */
+class TestingJobManager(
+    flinkConfiguration: Configuration,
+    executorService: ExecutorService,
+    instanceManager: InstanceManager,
+    scheduler: Scheduler,
+    libraryCacheManager: BlobLibraryCacheManager,
+    archive: ActorRef,
+    restartStrategyFactory: RestartStrategyFactory,
+    timeout: FiniteDuration,
+    leaderElectionService: LeaderElectionService,
+    submittedJobGraphs : SubmittedJobGraphStore,
+    checkpointRecoveryFactory : CheckpointRecoveryFactory,
+    savepointStore : SavepointStore,
+    jobRecoveryTimeout : FiniteDuration,
+    metricRegistry : Option[MetricRegistry])
+  extends JobManager(
+    flinkConfiguration,
+      executorService,
+    instanceManager,
+    scheduler,
+    libraryCacheManager,
+    archive,
+    restartStrategyFactory,
+    timeout,
+    leaderElectionService,
+    submittedJobGraphs,
+    checkpointRecoveryFactory,
+    savepointStore,
+    jobRecoveryTimeout,
+    metricRegistry)
+  with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
new file mode 100644
index 0000000..5ba2790
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.{ActorRef, Cancellable, Terminated}
+import akka.pattern.{ask, pipe}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
+import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient, RequestClassloadingProps}
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
+import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
+import org.apache.flink.runtime.testingUtils.TestingMessages._
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+/** This mixin can be used to decorate a JobManager with messages for testing purpose.  */
+trait TestingJobManagerLike extends FlinkActor {
+  that: JobManager =>
+
+  import context._
+
+  import scala.collection.JavaConverters._
+
+  val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
+  val waitForTaskManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
+
+  val waitForAllVerticesToBeRunningOrFinished =
+    scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
+
+  var periodicCheck: Option[Cancellable] = None
+
+  val waitForJobStatus = scala.collection.mutable.HashMap[JobID,
+    collection.mutable.HashMap[JobStatus, Set[ActorRef]]]()
+
+  val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, (Boolean, Set[ActorRef])]()
+
+  val waitForLeader = scala.collection.mutable.HashSet[ActorRef]()
+
+  val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder(
+    new Ordering[(Int, ActorRef)] {
+      override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1
+    })
+
+  val waitForClient = scala.collection.mutable.HashSet[ActorRef]()
+
+  val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
+
+  var disconnectDisabled = false
+
+  var postStopEnabled = true
+
+  abstract override def postStop(): Unit = {
+    if (postStopEnabled) {
+      super.postStop()
+    } else {
+      // only stop leader election service to revoke the leadership of this JM so that a new JM
+      // can be elected leader
+      leaderElectionService.stop()
+    }
+  }
+
+  abstract override def handleMessage: Receive = {
+    handleTestingMessage orElse super.handleMessage
+  }
+
+  def handleTestingMessage: Receive = {
+    case Alive => sender() ! Acknowledge
+
+    case RequestExecutionGraph(jobID) =>
+      currentJobs.get(jobID) match {
+        case Some((executionGraph, jobInfo)) => sender() ! decorateMessage(
+          ExecutionGraphFound(
+            jobID,
+            executionGraph)
+        )
+
+        case None => archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender())
+      }
+
+    case WaitForAllVerticesToBeRunning(jobID) =>
+      if(checkIfAllVerticesRunning(jobID)){
+        sender() ! decorateMessage(AllVerticesRunning(jobID))
+      }else{
+        val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]())
+        waitForAllVerticesToBeRunning += jobID -> (waiting + sender())
+
+        if(periodicCheck.isEmpty){
+          periodicCheck =
+            Some(
+              context.system.scheduler.schedule(
+                0 seconds,
+                200 millis,
+                self,
+                decorateMessage(NotifyListeners)
+              )
+            )
+        }
+      }
+    case WaitForAllVerticesToBeRunningOrFinished(jobID) =>
+      if(checkIfAllVerticesRunningOrFinished(jobID)){
+        sender() ! decorateMessage(AllVerticesRunning(jobID))
+      }else{
+        val waiting = waitForAllVerticesToBeRunningOrFinished.getOrElse(jobID, Set[ActorRef]())
+        waitForAllVerticesToBeRunningOrFinished += jobID -> (waiting + sender())
+
+        if(periodicCheck.isEmpty){
+          periodicCheck =
+            Some(
+              context.system.scheduler.schedule(
+                0 seconds,
+                200 millis,
+                self,
+                decorateMessage(NotifyListeners)
+              )
+            )
+        }
+      }
+
+    case NotifyListeners =>
+      for(jobID <- currentJobs.keySet){
+        notifyListeners(jobID)
+      }
+
+      if(waitForAllVerticesToBeRunning.isEmpty && waitForAllVerticesToBeRunningOrFinished.isEmpty) {
+        periodicCheck foreach { _.cancel() }
+        periodicCheck = None
+      }
+
+
+    case NotifyWhenJobRemoved(jobID) =>
+      val gateways = instanceManager.getAllRegisteredInstances.asScala.map(_.getActorGateway)
+
+      val responses = gateways.map{
+        gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean]
+      }
+
+      val jobRemovedOnJobManager = (self ? CheckIfJobRemoved(jobID))(timeout).mapTo[Boolean]
+
+      val allFutures = responses ++ Seq(jobRemovedOnJobManager)
+
+      import context.dispatcher
+      Future.fold(allFutures)(true)(_ & _) map(decorateMessage(_)) pipeTo sender()
+
+    case CheckIfJobRemoved(jobID) =>
+      if(currentJobs.contains(jobID)) {
+        context.system.scheduler.scheduleOnce(
+          200 milliseconds,
+          self,
+          decorateMessage(CheckIfJobRemoved(jobID))
+        )(context.dispatcher, sender())
+      } else {
+        sender() ! decorateMessage(true)
+      }
+
+    case NotifyWhenTaskManagerTerminated(taskManager) =>
+      val waiting = waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set())
+      waitForTaskManagerToBeTerminated += taskManager.path.name -> (waiting + sender)
+
+    case msg@Terminated(taskManager) =>
+      super.handleMessage(msg)
+
+      waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
+        _ foreach {
+          listener =>
+            listener ! decorateMessage(TaskManagerTerminated(taskManager))
+        }
+      }
+
+    // see shutdown method for reply
+    case NotifyOfComponentShutdown =>
+      waitForShutdown += sender()
+
+    case NotifyWhenAccumulatorChange(jobID) =>
+
+      val (updated, registered) = waitForAccumulatorUpdate.
+        getOrElse(jobID, (false, Set[ActorRef]()))
+      waitForAccumulatorUpdate += jobID -> (updated, registered + sender)
+      sender ! true
+
+    /**
+     * Notification from the task manager that changed accumulator are transferred on next
+     * Hearbeat. We need to keep this state to notify the listeners on next Heartbeat report.
+     */
+    case AccumulatorsChanged(jobID: JobID) =>
+      waitForAccumulatorUpdate.get(jobID) match {
+        case Some((updated, registered)) =>
+          waitForAccumulatorUpdate.put(jobID, (true, registered))
+        case None =>
+      }
+
+    /**
+     * Disabled async processing of accumulator values and send accumulators to the listeners if
+     * we previously received an [[AccumulatorsChanged]] message.
+     */
+    case msg : Heartbeat =>
+      super.handleMessage(msg)
+
+      waitForAccumulatorUpdate foreach {
+        case (jobID, (updated, actors)) if updated =>
+          currentJobs.get(jobID) match {
+            case Some((graph, jobInfo)) =>
+              val flinkAccumulators = graph.getFlinkAccumulators
+              val userAccumulators = graph.aggregateUserAccumulators
+              actors foreach {
+                 actor => actor ! UpdatedAccumulators(jobID, flinkAccumulators, userAccumulators)
+              }
+            case None =>
+          }
+          waitForAccumulatorUpdate.put(jobID, (false, actors))
+        case _ =>
+      }
+
+    case RequestWorkingTaskManager(jobID) =>
+      currentJobs.get(jobID) match {
+        case Some((eg, _)) =>
+          if(eg.getAllExecutionVertices.asScala.isEmpty){
+            sender ! decorateMessage(WorkingTaskManager(None))
+          } else {
+            val resource = eg.getAllExecutionVertices.asScala.head.getCurrentAssignedResource
+
+            if(resource == null){
+              sender ! decorateMessage(WorkingTaskManager(None))
+            } else {
+              sender ! decorateMessage(
+                WorkingTaskManager(
+                  Some(resource.getTaskManagerActorGateway())
+                )
+              )
+            }
+          }
+        case None => sender ! decorateMessage(WorkingTaskManager(None))
+      }
+
+    case NotifyWhenJobStatus(jobID, state) =>
+      val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID,
+        scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]())
+
+      val listener = jobStatusListener.getOrElse(state, Set[ActorRef]())
+
+      jobStatusListener += state -> (listener + sender)
+
+    case msg@JobStatusChanged(jobID, newJobStatus, _, _) =>
+      super.handleMessage(msg)
+
+      val cleanup = waitForJobStatus.get(jobID) match {
+        case Some(stateListener) =>
+          stateListener.remove(newJobStatus) match {
+            case Some(listeners) =>
+              listeners foreach {
+                _ ! decorateMessage(JobStatusIs(jobID, newJobStatus))
+              }
+            case _ =>
+          }
+          stateListener.isEmpty
+
+        case _ => false
+      }
+
+      if (cleanup) {
+        waitForJobStatus.remove(jobID)
+      }
+
+    case DisableDisconnect =>
+      disconnectDisabled = true
+
+    case DisablePostStop =>
+      postStopEnabled = false
+
+    case RequestSavepoint(savepointPath) =>
+      try {
+        val savepoint = savepointStore.loadSavepoint(savepointPath)
+        sender ! ResponseSavepoint(savepoint)
+      }
+      catch {
+        case e: Exception =>
+          sender ! ResponseSavepoint(null)
+      }
+
+    case msg: Disconnect =>
+      if (!disconnectDisabled) {
+        super.handleMessage(msg)
+
+        val taskManager = sender()
+
+        waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
+          _ foreach {
+            listener =>
+              listener ! decorateMessage(TaskManagerTerminated(taskManager))
+          }
+        }
+      }
+
+    case NotifyWhenLeader =>
+      if (leaderElectionService.hasLeadership) {
+        sender() ! true
+      } else {
+        waitForLeader += sender()
+      }
+
+    case msg: GrantLeadership =>
+      super.handleMessage(msg)
+
+      waitForLeader.foreach(_ ! true)
+
+      waitForLeader.clear()
+
+    case NotifyWhenClientConnects =>
+      waitForClient += sender()
+      sender() ! true
+
+    case msg: RegisterJobClient =>
+      super.handleMessage(msg)
+      waitForClient.foreach(_ ! ClientConnected)
+    case msg: RequestClassloadingProps =>
+      super.handleMessage(msg)
+      waitForClient.foreach(_ ! ClassLoadingPropsDelivered)
+
+    case NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) =>
+      if (that.instanceManager.getNumberOfRegisteredTaskManagers >= numRegisteredTaskManager) {
+        // there are already at least numRegisteredTaskManager registered --> send Acknowledge
+        sender() ! Acknowledge
+      } else {
+        // wait until we see at least numRegisteredTaskManager being registered at the JobManager
+        waitForNumRegisteredTaskManagers += ((numRegisteredTaskManager, sender()))
+      }
+
+    // TaskManager may be registered on these two messages
+    case msg @ (_: RegisterTaskManager) =>
+      super.handleMessage(msg)
+
+      // dequeue all senders which wait for instanceManager.getNumberOfStartedTaskManagers or
+      // fewer registered TaskManagers
+      while (waitForNumRegisteredTaskManagers.nonEmpty &&
+        waitForNumRegisteredTaskManagers.head._1 <=
+          instanceManager.getNumberOfRegisteredTaskManagers) {
+        val receiver = waitForNumRegisteredTaskManagers.dequeue()._2
+        receiver ! Acknowledge
+      }
+  }
+
+  def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
+    currentJobs.get(jobID) match {
+      case Some((eg, _)) =>
+        eg.getAllExecutionVertices.asScala.forall( _.getExecutionState == ExecutionState.RUNNING)
+      case None => false
+    }
+  }
+
+  def checkIfAllVerticesRunningOrFinished(jobID: JobID): Boolean = {
+    currentJobs.get(jobID) match {
+      case Some((eg, _)) =>
+        eg.getAllExecutionVertices.asScala.forall {
+          case vertex =>
+            (vertex.getExecutionState == ExecutionState.RUNNING
+              || vertex.getExecutionState == ExecutionState.FINISHED)
+        }
+      case None => false
+    }
+  }
+
+  def notifyListeners(jobID: JobID): Unit = {
+    if(checkIfAllVerticesRunning(jobID)) {
+      waitForAllVerticesToBeRunning.remove(jobID) match {
+        case Some(listeners) =>
+          for (listener <- listeners) {
+            listener ! decorateMessage(AllVerticesRunning(jobID))
+          }
+        case _ =>
+      }
+    }
+
+    if(checkIfAllVerticesRunningOrFinished(jobID)) {
+      waitForAllVerticesToBeRunningOrFinished.remove(jobID) match {
+        case Some(listeners) =>
+          for (listener <- listeners) {
+            listener ! decorateMessage(AllVerticesRunning(jobID))
+          }
+        case _ =>
+      }
+    }
+  }
+
+  /**
+    * No killing of the VM for testing.
+    */
+  override protected def shutdown(): Unit = {
+    log.info("Shutting down TestingJobManager.")
+    waitForShutdown.foreach(_ ! ComponentShutdown(self))
+    waitForShutdown.clear()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
new file mode 100644
index 0000000..d07c48f
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import java.util.Map
+
+import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
+import org.apache.flink.api.common.accumulators.Accumulator
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
+import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
+import org.apache.flink.runtime.instance.ActorGateway
+import org.apache.flink.runtime.jobgraph.JobStatus
+
+object TestingJobManagerMessages {
+
+  case class RequestExecutionGraph(jobID: JobID)
+
+  sealed trait ResponseExecutionGraph {
+    def jobID: JobID
+  }
+
+  case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph) extends
+  ResponseExecutionGraph
+
+  case class ExecutionGraphNotFound(jobID: JobID) extends ResponseExecutionGraph
+
+  case class WaitForAllVerticesToBeRunning(jobID: JobID)
+  case class WaitForAllVerticesToBeRunningOrFinished(jobID: JobID)
+  case class AllVerticesRunning(jobID: JobID)
+
+  case class NotifyWhenJobRemoved(jobID: JobID)
+
+  case class RequestWorkingTaskManager(jobID: JobID)
+  case class WorkingTaskManager(gatewayOption: Option[ActorGateway])
+
+  case class NotifyWhenJobStatus(jobID: JobID, state: JobStatus)
+  case class JobStatusIs(jobID: JobID, state: JobStatus)
+
+  case object NotifyListeners
+
+  case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
+  case class TaskManagerTerminated(taskManager: ActorRef)
+
+  /**
+   * Registers a listener to receive a message when accumulators changed.
+   * The change must be explicitly triggered by the TestingTaskManager which can receive an
+   * [[org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged]]
+   * message by a task that changed the accumulators. This message is then
+   * forwarded to the JobManager which will send the accumulators in the [[UpdatedAccumulators]]
+   * message when the next Heartbeat occurs.
+   */
+  case class NotifyWhenAccumulatorChange(jobID: JobID)
+
+  /**
+   * Reports updated accumulators back to the listener.
+   */
+  case class UpdatedAccumulators(jobID: JobID,
+    flinkAccumulators: Map[ExecutionAttemptID, Map[AccumulatorRegistry.Metric, Accumulator[_,_]]],
+    userAccumulators: Map[String, Accumulator[_,_]])
+
+  /** Notifies the sender when the [[TestingJobManager]] has been elected as the leader
+   *
+   */
+  case object NotifyWhenLeader
+
+  /**
+    * Notifies the sender when the [[TestingJobManager]] receives new clients for jobs
+    */
+  case object NotifyWhenClientConnects
+  /**
+    * Notifes of client connect
+    */
+  case object ClientConnected
+  /**
+    * Notifies when the client has requested class loading information
+    */
+  case object ClassLoadingPropsDelivered
+
+  /**
+   * Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]]
+   * message when at least numRegisteredTaskManager have registered at the JobManager.
+   *
+   * @param numRegisteredTaskManager minimum number of registered TMs before the sender is notified
+   */
+  case class NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager: Int)
+
+  /** Disables the post stop method of the [[TestingJobManager]].
+    *
+    * Only the leaderElectionService is stopped in the postStop method call to revoke the leadership
+    */
+  case object DisablePostStop
+
+  /**
+    * Requests a savepoint from the job manager.
+    *
+    * @param savepointPath The path of the savepoint to request.
+    */
+  case class RequestSavepoint(savepointPath: String)
+
+  /**
+    * Response to a savepoint request.
+    *
+    * @param savepoint The requested savepoint or null if none available.
+    */
+  case class ResponseSavepoint(savepoint: Savepoint)
+
+  def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader
+  def getNotifyWhenClientConnects(): AnyRef = NotifyWhenClientConnects
+  def getDisablePostStop(): AnyRef = DisablePostStop
+
+  def getClientConnected(): AnyRef = ClientConnected
+  def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
new file mode 100644
index 0000000..48a1ddd
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import org.apache.flink.runtime.jobmanager.MemoryArchivist
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphFound, ExecutionGraphNotFound, RequestExecutionGraph}
+
+/** Memory archivist extended by testing messages
+  *
+  * @param maxEntries number of maximum number of archived jobs
+  */
+class TestingMemoryArchivist(maxEntries: Int) extends MemoryArchivist(maxEntries) {
+
+  override def handleMessage: Receive = {
+    handleTestingMessage orElse super.handleMessage
+  }
+
+  def handleTestingMessage: Receive = {
+    case RequestExecutionGraph(jobID) =>
+      val executionGraph = graphs.get(jobID)
+      
+      executionGraph match {
+        case Some(graph) => sender ! decorateMessage(ExecutionGraphFound(jobID, graph))
+        case None => sender ! decorateMessage(ExecutionGraphNotFound(jobID))
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
new file mode 100644
index 0000000..91d169a
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
+
+object TestingMessages {
+
+  case class CheckIfJobRemoved(jobID: JobID)
+
+  case object DisableDisconnect
+
+  case object Alive
+
+  def getAlive: AnyRef = Alive
+
+  def getDisableDisconnect: AnyRef = DisableDisconnect
+
+  case object NotifyOfComponentShutdown
+  case class ComponentShutdown(ref: ActorRef)
+
+  def getNotifyOfComponentShutdown(): AnyRef = NotifyOfComponentShutdown
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
new file mode 100644
index 0000000..9b5a147
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.io.disk.iomanager.IOManager
+import org.apache.flink.runtime.io.network.NetworkEnvironment
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
+import org.apache.flink.runtime.memory.MemoryManager
+import org.apache.flink.runtime.taskmanager.{TaskManagerLocation, TaskManager, TaskManagerConfiguration}
+
+import scala.language.postfixOps
+
+/** Subclass of the [[TaskManager]] to support testing messages
+ */
+class TestingTaskManager(
+                          config: TaskManagerConfiguration,
+                          resourceID: ResourceID,
+                          connectionInfo: TaskManagerLocation,
+                          memoryManager: MemoryManager,
+                          ioManager: IOManager,
+                          network: NetworkEnvironment,
+                          numberOfSlots: Int,
+                          leaderRetrievalService: LeaderRetrievalService)
+  extends TaskManager(
+    config,
+    resourceID,
+    connectionInfo,
+    memoryManager,
+    ioManager,
+    network,
+    numberOfSlots,
+    leaderRetrievalService)
+  with TestingTaskManagerLike {
+
+  def this(
+            config: TaskManagerConfiguration,
+            connectionInfo: TaskManagerLocation,
+            memoryManager: MemoryManager,
+            ioManager: IOManager,
+            network: NetworkEnvironment,
+            numberOfSlots: Int,
+            leaderRetrievalService: LeaderRetrievalService) {
+    this(
+      config,
+      ResourceID.generate(),
+      connectionInfo,
+      memoryManager,
+      ioManager,
+      network,
+      numberOfSlots,
+      leaderRetrievalService)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
new file mode 100644
index 0000000..2498dbe
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.{ActorRef, Terminated}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.messages.JobManagerMessages.{RequestLeaderSessionID, ResponseLeaderSessionID}
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered}
+import org.apache.flink.runtime.messages.TaskMessages.{SubmitTask, TaskInFinalState, UpdateTaskExecutionState}
+import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
+import org.apache.flink.runtime.testingUtils.TestingMessages._
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+/** This mixin can be used to decorate a TaskManager with messages for testing purposes. */
+trait TestingTaskManagerLike extends FlinkActor {
+  that: TaskManager =>
+
+  import scala.collection.JavaConverters._
+
+  val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
+  val waitForJobManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
+  val waitForRegisteredAtResourceManager =
+    scala.collection.mutable.HashMap[ActorRef, Set[ActorRef]]()
+  val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
+  val unregisteredTasks = scala.collection.mutable.HashSet[ExecutionAttemptID]()
+
+  /** Map of registered task submit listeners */
+  val registeredSubmitTaskListeners = scala.collection.mutable.HashMap[JobID, ActorRef]()
+
+  val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
+
+  var disconnectDisabled = false
+
+  /**
+   * Handler for testing related messages
+   */
+  abstract override def handleMessage: Receive = {
+    handleTestingMessage orElse super.handleMessage
+  }
+
+  def handleTestingMessage: Receive = {
+    case Alive => sender() ! Acknowledge
+
+    case NotifyWhenTaskIsRunning(executionID) =>
+      Option(runningTasks.get(executionID)) match {
+        case Some(task) if task.getExecutionState == ExecutionState.RUNNING =>
+          sender ! decorateMessage(true)
+
+        case _ =>
+          val listeners = waitForRunning.getOrElse(executionID, Set())
+          waitForRunning += (executionID -> (listeners + sender))
+      }
+
+    case RequestRunningTasks =>
+      sender ! decorateMessage(ResponseRunningTasks(runningTasks.asScala.toMap))
+
+    case NotifyWhenTaskRemoved(executionID) =>
+      Option(runningTasks.get(executionID)) match {
+        case Some(_) =>
+          val set = waitForRemoval.getOrElse(executionID, Set())
+          waitForRemoval += (executionID -> (set + sender))
+        case None =>
+          if(unregisteredTasks.contains(executionID)) {
+            sender ! decorateMessage(true)
+          } else {
+            val set = waitForRemoval.getOrElse(executionID, Set())
+            waitForRemoval += (executionID -> (set + sender))
+          }
+      }
+
+    case TaskInFinalState(executionID) =>
+      super.handleMessage(TaskInFinalState(executionID))
+      waitForRemoval.remove(executionID) match {
+        case Some(actors) => for(actor <- actors) actor ! decorateMessage(true)
+        case None =>
+      }
+
+      unregisteredTasks += executionID
+
+    case NotifyWhenJobRemoved(jobID) =>
+      if(runningTasks.values.asScala.exists(_.getJobID == jobID)){
+        context.system.scheduler.scheduleOnce(
+          200 milliseconds,
+          self,
+          decorateMessage(CheckIfJobRemoved(jobID)))(
+            context.dispatcher,
+            sender()
+          )
+      }else{
+        sender ! decorateMessage(true)
+      }
+
+    case CheckIfJobRemoved(jobID) =>
+      if(runningTasks.values.asScala.forall(_.getJobID != jobID)){
+        sender ! decorateMessage(true)
+      } else {
+        context.system.scheduler.scheduleOnce(
+          200 milliseconds,
+          self,
+          decorateMessage(CheckIfJobRemoved(jobID)))(
+            context.dispatcher,
+            sender()
+          )
+      }
+
+    case NotifyWhenJobManagerTerminated(jobManager) =>
+      val waiting = waitForJobManagerToBeTerminated.getOrElse(jobManager.path.name, Set())
+      waitForJobManagerToBeTerminated += jobManager.path.name -> (waiting + sender)
+
+    case RegisterSubmitTaskListener(jobId) =>
+      registeredSubmitTaskListeners.put(jobId, sender())
+
+    case msg@SubmitTask(tdd) =>
+      registeredSubmitTaskListeners.get(tdd.getJobID) match {
+        case Some(listenerRef) =>
+          listenerRef ! ResponseSubmitTaskListener(tdd)
+        case None =>
+        // Nothing to do
+      }
+
+      super.handleMessage(msg)
+
+    /**
+     * Message from task manager that accumulator values changed and need to be reported immediately
+     * instead of lazily through the
+     * [[org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat]] message. We forward this
+     * message to the job manager that it knows it should report to the listeners.
+     */
+    case msg: AccumulatorsChanged =>
+      currentJobManager match {
+        case Some(jobManager) =>
+          jobManager.forward(msg)
+          sendHeartbeatToJobManager()
+          sender ! true
+        case None =>
+      }
+
+    case msg@Terminated(jobManager) =>
+      super.handleMessage(msg)
+
+      waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
+        _ foreach {
+          _ ! decorateMessage(JobManagerTerminated(jobManager))
+        }
+      }
+
+    case msg:Disconnect =>
+      if (!disconnectDisabled) {
+        super.handleMessage(msg)
+
+        val jobManager = sender()
+
+        waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
+          _ foreach {
+            _ ! decorateMessage(JobManagerTerminated(jobManager))
+          }
+        }
+      }
+
+    case DisableDisconnect =>
+      disconnectDisabled = true
+
+    case NotifyOfComponentShutdown =>
+      waitForShutdown += sender()
+
+    case msg @ UpdateTaskExecutionState(taskExecutionState) =>
+      super.handleMessage(msg)
+
+      if(taskExecutionState.getExecutionState == ExecutionState.RUNNING) {
+        waitForRunning.get(taskExecutionState.getID) foreach {
+          _ foreach (_ ! decorateMessage(true))
+        }
+      }
+
+    case RequestLeaderSessionID =>
+      sender() ! ResponseLeaderSessionID(leaderSessionID.orNull)
+
+    case NotifyWhenRegisteredAtJobManager(jobManager: ActorRef) =>
+      if(isConnected && jobManager == currentJobManager.get) {
+        sender() ! true
+      } else {
+        val list = waitForRegisteredAtResourceManager.getOrElse(
+          jobManager,
+          Set[ActorRef]())
+
+        waitForRegisteredAtResourceManager += jobManager -> (list + sender())
+      }
+
+    case msg @ (_: AcknowledgeRegistration | _: AlreadyRegistered) =>
+      super.handleMessage(msg)
+
+      val jm = sender()
+
+      waitForRegisteredAtResourceManager.remove(jm).foreach {
+        listeners => listeners.foreach{
+          listener =>
+            listener ! true
+        }
+      }
+  }
+
+  /**
+    * No killing of the VM for testing.
+    */
+  override protected def shutdown(): Unit = {
+    log.info("Shutting down TestingJobManager.")
+    waitForShutdown.foreach(_ ! ComponentShutdown(self))
+    waitForShutdown.clear()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
new file mode 100644
index 0000000..32c3c55
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.taskmanager.Task
+
+/**
+ * Additional messages that the [[TestingTaskManager]] understands.
+ */
+object TestingTaskManagerMessages {
+  
+  case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID)
+
+  case class NotifyWhenTaskIsRunning(executionID: ExecutionAttemptID)
+  
+  case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){
+    import collection.JavaConverters._
+    def asJava: java.util.Map[ExecutionAttemptID, Task] = tasks.asJava
+  }
+  
+  case object RequestRunningTasks
+
+  case class NotifyWhenJobManagerTerminated(jobManager: ActorRef)
+
+  case class JobManagerTerminated(jobManager: ActorRef)
+
+  case class NotifyWhenRegisteredAtJobManager(resourceManager: ActorRef)
+
+  /**
+   * Message to give a hint to the task manager that accumulator values were updated in the task.
+   * This message is forwarded to the job manager which knows that it needs to notify listeners
+   * of accumulator updates.
+   */
+  case class AccumulatorsChanged(jobID: JobID)
+
+  /**
+    * Registers a listener for all [[org.apache.flink.runtime.messages.TaskMessages.SubmitTask]]
+    * messages of the given job.
+    *
+    * If a task is submitted with the given job ID the task deployment
+    * descriptor is forwarded to the listener.
+    *
+    * @param jobId The job ID to listen for.
+    */
+  case class RegisterSubmitTaskListener(jobId: JobID)
+
+  /**
+    * A response to a listened job ID containing the submitted task deployment descriptor.
+    *
+    * @param tdd The submitted task deployment descriptor.
+    */
+  case class ResponseSubmitTaskListener(tdd: TaskDeploymentDescriptor)
+
+  // --------------------------------------------------------------------------
+  // Utility methods to allow simpler case object access from Java
+  // --------------------------------------------------------------------------
+
+  def getRequestRunningTasksMessage: AnyRef = {
+    RequestRunningTasks
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index ee1b264..00410cc 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -22,9 +22,10 @@ import java.io._
 import java.util.concurrent.TimeUnit
 
 import org.apache.flink.configuration.GlobalConfiguration
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.util.TestLogger
-import org.junit.{AfterClass, BeforeClass, Test, Assert}
+import org.junit.{AfterClass, Assert, BeforeClass, Test}
 
 import scala.concurrent.duration.FiniteDuration
 import scala.tools.nsc.Settings
@@ -334,7 +335,7 @@ class ScalaShellITCase extends TestLogger {
 }
 
 object ScalaShellITCase {
-  var cluster: Option[ForkableFlinkMiniCluster] = None
+  var cluster: Option[LocalFlinkMiniCluster] = None
   val parallelism = 4
 
   @BeforeClass

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 8bb440c..f94ff68 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -40,6 +40,7 @@ import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -49,7 +50,6 @@ import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
 
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.TestEnvironment;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -134,7 +134,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 		}
 	}
 
-	private static ForkableFlinkMiniCluster flinkCluster;
+	private static LocalFlinkMiniCluster flinkCluster;
 
 	// ------------------------------------------------------------------------
 	//  Cluster Setup (Cassandra & Flink)
@@ -205,7 +205,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
 
-		flinkCluster = new ForkableFlinkMiniCluster(config);
+		flinkCluster = new LocalFlinkMiniCluster(config);
 		flinkCluster.start();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 9e3c33b..c4949ff 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -30,7 +31,6 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.junit.AfterClass;
@@ -58,7 +58,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 	
 	private static KafkaTestEnvironment kafkaServer;
 	private static Properties standardProps;
-	private static ForkableFlinkMiniCluster flink;
+	private static LocalFlinkMiniCluster flink;
 
 	@BeforeClass
 	public static void prepare() throws IOException, ClassNotFoundException {
@@ -88,7 +88,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 
-		flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+		flink = new LocalFlinkMiniCluster(flinkConfig, false);
 		flink.start();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index eddb57c..771db17 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -22,8 +22,8 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
@@ -65,7 +65,7 @@ public abstract class KafkaTestBase extends TestLogger {
 
 	protected static Properties standardProps;
 	
-	protected static ForkableFlinkMiniCluster flink;
+	protected static LocalFlinkMiniCluster flink;
 
 	protected static int flinkPort;
 
@@ -105,7 +105,7 @@ public abstract class KafkaTestBase extends TestLogger {
 		flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
 		flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 
-		flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+		flink = new LocalFlinkMiniCluster(flinkConfig, false);
 		flink.start();
 
 		flinkPort = flink.getLeaderRPCPort();

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
index 3705943..2e452c1 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
@@ -21,11 +21,11 @@ import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisEventsGeneratorProducerThread;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,7 +80,7 @@ public class ManualExactlyOnceTest {
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 
-		ForkableFlinkMiniCluster flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+		LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false);
 		flink.start();
 
 		final int flinkPort = flink.getLeaderRPCPort();