You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/08/29 15:33:26 UTC

[6/9] flink git commit: [FLINK-1984] Mesos ResourceManager - T1 milstone (3)

http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-mesos/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala b/flink-mesos/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
deleted file mode 100644
index 49c86b5..0000000
--- a/flink-mesos/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework
-
-import java.util.concurrent.{TimeUnit, ExecutorService}
-
-import akka.actor.ActorRef
-
-import org.apache.flink.api.common.JobID
-import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants}
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
-import org.apache.flink.runtime.clusterframework.ApplicationStatus
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.clusterframework.messages._
-import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager}
-import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, CurrentJobStatus, JobNotFound}
-import org.apache.flink.runtime.messages.Messages.Acknowledge
-import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-
-/** JobManager actor for execution on Yarn or Mesos. It enriches the [[JobManager]] with additional messages
-  * to start/administer/stop the session.
-  *
-  * @param flinkConfiguration Configuration object for the actor
-  * @param executorService Execution context which is used to execute concurrent tasks in the
-  *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
-  * @param instanceManager Instance manager to manage the registered
-  *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
-  * @param scheduler Scheduler to schedule Flink jobs
-  * @param libraryCacheManager Manager to manage uploaded jar files
-  * @param archive Archive for finished Flink jobs
-  * @param restartStrategyFactory Restart strategy to be used in case of a job recovery
-  * @param timeout Timeout for futures
-  * @param leaderElectionService LeaderElectionService to participate in the leader election
-  */
-abstract class ContaineredJobManager(
-                      flinkConfiguration: FlinkConfiguration,
-                      executorService: ExecutorService,
-                      instanceManager: InstanceManager,
-                      scheduler: FlinkScheduler,
-                      libraryCacheManager: BlobLibraryCacheManager,
-                      archive: ActorRef,
-                      restartStrategyFactory: RestartStrategyFactory,
-                      timeout: FiniteDuration,
-                      leaderElectionService: LeaderElectionService,
-                      submittedJobGraphs : SubmittedJobGraphStore,
-                      checkpointRecoveryFactory : CheckpointRecoveryFactory,
-                      savepointStore: SavepointStore,
-                      jobRecoveryTimeout: FiniteDuration,
-                      metricsRegistry: Option[FlinkMetricRegistry])
-  extends JobManager(
-    flinkConfiguration,
-    executorService,
-    instanceManager,
-    scheduler,
-    libraryCacheManager,
-    archive,
-    restartStrategyFactory,
-    timeout,
-    leaderElectionService,
-    submittedJobGraphs,
-    checkpointRecoveryFactory,
-    savepointStore,
-    jobRecoveryTimeout,
-    metricsRegistry) {
-
-  val jobPollingInterval: FiniteDuration
-
-  // indicates if this JM has been started in a dedicated (per-job) mode.
-  var stopWhenJobFinished: JobID = null
-
-  override def handleMessage: Receive = {
-    handleContainerMessage orElse super.handleMessage
-  }
-
-  def handleContainerMessage: Receive = {
-
-    case msg @ (_: RegisterInfoMessageListener | _: UnRegisterInfoMessageListener) =>
-      // forward to ResourceManager
-      currentResourceManager match {
-        case Some(rm) =>
-          // we forward the message
-          rm.forward(decorateMessage(msg))
-        case None =>
-        // client has to try again
-      }
-
-    case msg: ShutdownClusterAfterJob =>
-      val jobId = msg.jobId()
-      log.info(s"ApplicationMaster will shut down session when job $jobId has finished.")
-      stopWhenJobFinished = jobId
-      // trigger regular job status messages (if this is a dedicated/per-job cluster)
-      if (stopWhenJobFinished != null) {
-        context.system.scheduler.schedule(0 seconds,
-          jobPollingInterval,
-          new Runnable {
-            override def run(): Unit = {
-              self ! decorateMessage(RequestJobStatus(stopWhenJobFinished))
-            }
-          }
-        )(context.dispatcher)
-      }
-
-      sender() ! decorateMessage(Acknowledge)
-
-    case msg: GetClusterStatus =>
-      sender() ! decorateMessage(
-        new GetClusterStatusResponse(
-          instanceManager.getNumberOfRegisteredTaskManagers,
-          instanceManager.getTotalNumberOfSlots)
-      )
-
-    case jnf: JobNotFound =>
-      log.debug(s"Job with ID ${jnf.jobID} not found in JobManager")
-      if (stopWhenJobFinished == null) {
-        log.warn("The ApplicationMaster didn't expect to receive this message")
-      }
-
-    case jobStatus: CurrentJobStatus =>
-      if (stopWhenJobFinished == null) {
-        log.warn(s"Received job status $jobStatus which wasn't requested.")
-      } else {
-        if (stopWhenJobFinished != jobStatus.jobID) {
-          log.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " +
-            s"job $stopWhenJobFinished")
-        } else {
-          if (jobStatus.status.isTerminalState) {
-            log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " +
-              s"Shutting down session")
-            if (jobStatus.status == JobStatus.FINISHED) {
-              self ! decorateMessage(
-                new StopCluster(
-                  ApplicationStatus.SUCCEEDED,
-                  s"The monitored job with ID ${jobStatus.jobID} has finished.")
-              )
-            } else {
-              self ! decorateMessage(
-                new StopCluster(
-                  ApplicationStatus.FAILED,
-                  s"The monitored job with ID ${jobStatus.jobID} has failed to complete.")
-              )
-            }
-          } else {
-            log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}")
-          }
-        }
-      }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index 05fb033..f287e13 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -235,9 +235,9 @@ public class MesosFlinkResourceManagerTest {
 				protected void run() {
 					try {
 						// set the initial persistent state then initialize the RM
-						MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newTask(task1);
-						MesosWorkerStore.Worker worker2 = MesosWorkerStore.Worker.newTask(task2).launchTask(slave1, slave1host);
-						MesosWorkerStore.Worker worker3 = MesosWorkerStore.Worker.newTask(task3).launchTask(slave1, slave1host).releaseTask();
+						MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1);
+						MesosWorkerStore.Worker worker2 = MesosWorkerStore.Worker.newWorker(task2).launchWorker(slave1, slave1host);
+						MesosWorkerStore.Worker worker3 = MesosWorkerStore.Worker.newWorker(task3).launchWorker(slave1, slave1host).releaseWorker();
 						when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
 						when(workerStore.recoverWorkers()).thenReturn(Arrays.asList(worker1, worker2, worker3));
 						initialize();
@@ -276,7 +276,7 @@ public class MesosFlinkResourceManagerTest {
 				protected void run() {
 					try {
 						// set the initial persistent state then initialize the RM
-						MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newTask(task1).launchTask(slave1, slave1host);
+						MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
 						when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
 						when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched));
 						initialize();
@@ -306,7 +306,7 @@ public class MesosFlinkResourceManagerTest {
 				protected void run() {
 					try {
 						// set the initial state with a (recovered) launched worker
-						MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newTask(task1).launchTask(slave1, slave1host);
+						MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
 						when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
 						when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched));
 						initialize();
@@ -339,7 +339,7 @@ public class MesosFlinkResourceManagerTest {
 				protected void run() {
 					try {
 						// set the initial persistent state, initialize the RM, then register with task1 as a registered worker
-						MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newTask(task1).launchTask(slave1, slave1host);
+						MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
 						when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
 						when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1));
 						initialize();
@@ -351,12 +351,13 @@ public class MesosFlinkResourceManagerTest {
 
 						// verify that the worker was persisted, the internal state was updated, the task router was notified,
 						// and the launch coordinator was notified about the host assignment change
-						MesosWorkerStore.Worker worker2Released = worker1.releaseTask();
+						MesosWorkerStore.Worker worker2Released = worker1.releaseWorker();
 						verify(workerStore).putWorker(worker2Released);
 						assertThat(resourceManagerInstance.workersBeingReturned, hasEntry(extractResourceID(task1), worker2Released));
 						resourceManagerInstance.launchCoordinator.expectMsg(new LaunchCoordinator.Unassign(task1, slave1host));
 
 						// send the subsequent terminated message
+						when(workerStore.removeWorker(task1)).thenReturn(true);
 						resourceManager.tell(new TaskMonitor.TaskTerminated(task1, Protos.TaskStatus.newBuilder()
 							.setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_FINISHED).build()));
 
@@ -391,7 +392,7 @@ public class MesosFlinkResourceManagerTest {
 
 						// verify that a new worker was persisted, the internal state was updated, the task router was notified,
 						// and the launch coordinator was asked to launch a task
-						MesosWorkerStore.Worker expected = MesosWorkerStore.Worker.newTask(task1);
+						MesosWorkerStore.Worker expected = MesosWorkerStore.Worker.newWorker(task1);
 						verify(workerStore).putWorker(expected);
 						assertThat(resourceManagerInstance.workersInNew, hasEntry(extractResourceID(task1), expected));
 						resourceManagerInstance.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class);
@@ -438,7 +439,7 @@ public class MesosFlinkResourceManagerTest {
 				protected void run() {
 					try {
 						// set the initial persistent state with a new task then initialize the RM
-						MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newTask(task1);
+						MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1);
 						when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
 						when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1));
 						initialize();
@@ -455,7 +456,7 @@ public class MesosFlinkResourceManagerTest {
 
 						// verify that the worker was persisted, the internal state was updated,
 						// Mesos was asked to launch task1, and the task router was notified
-						MesosWorkerStore.Worker worker1launched = worker1.launchTask(slave1, slave1host);
+						MesosWorkerStore.Worker worker1launched = worker1.launchWorker(slave1, slave1host);
 						verify(workerStore).putWorker(worker1launched);
 						assertThat(resourceManagerInstance.workersInNew.entrySet(), empty());
 						assertThat(resourceManagerInstance.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));
@@ -505,7 +506,7 @@ public class MesosFlinkResourceManagerTest {
 				protected void run() {
 					try {
 						// set the initial persistent state with a launched worker that hasn't yet registered
-						MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newTask(task1).launchTask(slave1, slave1host);
+						MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
 						when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
 						when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched));
 						initialize();
@@ -513,6 +514,7 @@ public class MesosFlinkResourceManagerTest {
 
 						// tell the RM that a task failed (and prepare a replacement task)
 						when(workerStore.newTaskID()).thenReturn(task2);
+						when(workerStore.removeWorker(task1)).thenReturn(true);
 						resourceManager.tell(new SetWorkerPoolSize(1), jobManager);
 						resourceManager.tell(new TaskMonitor.TaskTerminated(task1, Protos.TaskStatus.newBuilder()
 							.setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_FAILED).build()));
@@ -540,7 +542,7 @@ public class MesosFlinkResourceManagerTest {
 				protected void run() {
 					try {
 						// set the initial persistent state with a launched & registered worker
-						MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newTask(task1).launchTask(slave1, slave1host);
+						MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
 						when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
 						when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched));
 						initialize();
@@ -548,6 +550,7 @@ public class MesosFlinkResourceManagerTest {
 
 						// tell the RM that a task failed (and prepare a replacement task)
 						when(workerStore.newTaskID()).thenReturn(task2);
+						when(workerStore.removeWorker(task1)).thenReturn(true);
 						resourceManager.tell(new SetWorkerPoolSize(1), jobManager);
 						resourceManager.tell(new TaskMonitor.TaskTerminated(task1, Protos.TaskStatus.newBuilder()
 							.setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_FAILED).build()));
@@ -582,7 +585,7 @@ public class MesosFlinkResourceManagerTest {
 
 						// verify that the Mesos framework is shutdown
 						verify(schedulerDriver).stop(false);
-						verify(workerStore).cleanup();
+						verify(workerStore).stop(true);
 						expectTerminated(resourceManager.actor());
 					}
 					catch(Exception ex) {

http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-mesos/src/test/scala/org/apache/flink/mesos/Utils.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/Utils.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/Utils.scala
index 3ab72cd..80186cf 100644
--- a/flink-mesos/src/test/scala/org/apache/flink/mesos/Utils.scala
+++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/Utils.scala
@@ -31,7 +31,8 @@ object Matchers {
   def contentsMatch[T](plan: Seq[T]): java.util.Collection[T] = {
     org.mockito.Matchers.argThat(new ArgumentMatcher[java.util.Collection[T]] {
       override def matches(o: scala.Any): Boolean = o match {
-        case actual: java.util.Collection[T] => actual.size() == plan.size && actual.containsAll(plan.asJava)
+        case actual: java.util.Collection[T] =>
+          actual.size() == plan.size && actual.containsAll(plan.asJava)
         case _ => false
       }
     })
@@ -46,7 +47,8 @@ object TestFSMUtils {
     "$" + akka.util.Helpers.base64(l)
   }
 
-  def testFSMRef[S, D, T <: Actor: ClassTag](factory: \u21d2 T, supervisor: ActorRef)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T] = {
+  def testFSMRef[S, D, T <: Actor: ClassTag](factory: \u21d2 T, supervisor: ActorRef)
+      (implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T] = {
     new TestFSMRef(system, Props(factory), supervisor, TestFSMUtils.randomName)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
index be7d788..34c1f66 100644
--- a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
+++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
@@ -93,7 +93,9 @@ class LaunchCoordinatorTest
 
     val task: LaunchableTask = new LaunchableTask() {
       override def taskRequest: TaskRequest = generateTaskRequest
-      override def launch(slaveId: SlaveID, taskAssignment: TaskAssignmentResult): Protos.TaskInfo = {
+      override def launch(
+          slaveId: SlaveID,
+          taskAssignment: TaskAssignmentResult): Protos.TaskInfo = {
         Protos.TaskInfo.newBuilder
           .setTaskId(taskID).setName(taskID.getValue)
           .setCommand(Protos.CommandInfo.newBuilder.setValue("whoami"))
@@ -135,11 +137,12 @@ class LaunchCoordinatorTest
     */
   def taskAssignmentResult(lease: VirtualMachineLease, task: TaskRequest): TaskAssignmentResult = {
     val ports = lease.portRanges().get(0)
+    val assignedPorts = ports.getBeg to ports.getBeg + task.getPorts
     val r = mock(classOf[TaskAssignmentResult])
     when(r.getTaskId).thenReturn(task.getId)
     when(r.getHostname).thenReturn(lease.hostname())
     when(r.getAssignedPorts).thenReturn(
-      (ports.getBeg to ports.getBeg + task.getPorts).toList.asJava.asInstanceOf[java.util.List[Integer]])
+      assignedPorts.toList.asJava.asInstanceOf[java.util.List[Integer]])
     when(r.getRequest).thenReturn(task)
     when(r.isSuccessful).thenReturn(true)
     when(r.getFitness).thenReturn(1.0)
@@ -196,7 +199,8 @@ class LaunchCoordinatorTest
     */
   def taskSchedulerBuilder(optimizer: TaskScheduler) = new TaskSchedulerBuilder {
     var leaseRejectAction: Action1[VirtualMachineLease] = null
-    override def withLeaseRejectAction(action: Action1[VirtualMachineLease]): TaskSchedulerBuilder = {
+    override def withLeaseRejectAction(
+        action: Action1[VirtualMachineLease]): TaskSchedulerBuilder = {
       leaseRejectAction = action
       this
     }
@@ -225,7 +229,8 @@ class LaunchCoordinatorTest
     val optimizerBuilder = taskSchedulerBuilder(optimizer)
     val schedulerDriver = mock(classOf[SchedulerDriver])
     val trace = Mockito.inOrder(schedulerDriver)
-    val fsm = TestFSMRef(new LaunchCoordinator(testActor, config, schedulerDriver, optimizerBuilder))
+    val fsm =
+      TestFSMRef(new LaunchCoordinator(testActor, config, schedulerDriver, optimizerBuilder))
 
     val framework = randomFramework
     val task1 = randomTask
@@ -234,12 +239,14 @@ class LaunchCoordinatorTest
 
     val slave1 = {
       val slave = randomSlave
-      (slave._1, slave._2, randomOffer(framework, slave), randomOffer(framework, slave), randomOffer(framework, slave))
+      (slave._1, slave._2,
+        randomOffer(framework, slave), randomOffer(framework, slave), randomOffer(framework, slave))
     }
 
     val slave2 = {
       val slave = randomSlave
-      (slave._1, slave._2, randomOffer(framework, slave), randomOffer(framework, slave), randomOffer(framework, slave))
+      (slave._1, slave._2,
+        randomOffer(framework, slave), randomOffer(framework, slave), randomOffer(framework, slave))
     }
   }
 
@@ -337,10 +344,10 @@ class LaunchCoordinatorTest
           verify(schedulerDriver).suppressOffers()
         }
         "declines any outstanding offers" in new Context {
-          //fsm.setState(GatheringOffers, GatherData(newOffers = Seq(new VMLeaseObject(slave1._3))))
           fsm.setState(GatheringOffers, GatherData())
           fsm ! new Disconnected()
           verify(optimizer).expireAllLeases()
+          verify(optimizer).scheduleOnce(MM.any(), MM.any())
         }
       }
       "Disconnected" which {
@@ -351,7 +358,8 @@ class LaunchCoordinatorTest
           fsm.stateData.tasks should contain only (task1._2)
         }
         "transitions to Suspended with offer queue emptied" in new Context {
-          fsm.setState(GatheringOffers, GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
+          fsm.setState(GatheringOffers,
+            GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
           fsm ! new Disconnected()
           fsm.stateName should be (Suspended)
           fsm.stateData.newLeases should be (empty)
@@ -359,7 +367,8 @@ class LaunchCoordinatorTest
       }
       "Launch" which {
         "stays in GatheringOffers with updated task queue" in new Context {
-          fsm.setState(GatheringOffers, GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
+          fsm.setState(GatheringOffers,
+            GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
           fsm ! Launch(Seq(task2._2).asJava)
           fsm.stateName should be (GatheringOffers)
           fsm.stateData.tasks should contain only (task1._2, task2._2)
@@ -368,16 +377,19 @@ class LaunchCoordinatorTest
       }
       "ResourceOffers" which {
         "stays in GatheringOffers with offer queue updated" in new Context {
-          fsm.setState(GatheringOffers, GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
+          fsm.setState(GatheringOffers,
+            GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
           fsm ! new ResourceOffers(Seq(slave1._4, slave2._3).asJava)
           fsm.stateName should be (GatheringOffers)
           fsm.stateData.tasks should contain only (task1._2)
-          fsm.stateData.newLeases.map(_.getOffer) should contain only (slave1._3, slave1._4, slave2._3)
+          fsm.stateData.newLeases.map(_.getOffer) should contain only
+            (slave1._3, slave1._4, slave2._3)
         }
       }
       "OfferRescinded" which {
         "stays in GatheringOffers with offer queue updated" in new Context {
-          fsm.setState(GatheringOffers, GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
+          fsm.setState(GatheringOffers,
+            GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
           fsm ! new OfferRescinded(slave1._3.getId)
           verify(optimizer).expireLease(slave1._3.getId.getValue)
           fsm.stateName should be (GatheringOffers)
@@ -387,45 +399,60 @@ class LaunchCoordinatorTest
       }
       "StateTimeout" which {
         "sends AcceptOffers message for matched tasks" in new Context {
-          when(optimizer.scheduleOnce(MM.any(), MM.any())) thenAnswer scheduleOnce { (requests, newLeases) =>
-            val (lease, task) = (newLeases.head, requests.head)
-            schedulingResult(
-              successes = Seq(vmAssignmentResult(lease.hostname(), Seq(lease), Set(taskAssignmentResult(lease, task)))))
-          }
-          fsm.setState(GatheringOffers, GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
+          when(optimizer.scheduleOnce(MM.any(), MM.any())) thenAnswer {
+            scheduleOnce { (requests, newLeases) =>
+              val (l, task) = (newLeases.head, requests.head)
+              val vm = vmAssignmentResult(l.hostname(), Seq(l), Set(taskAssignmentResult(l, task)))
+              schedulingResult(successes = Seq(vm))
+            }
+          } thenReturn(schedulingResult(successes = Nil))
+
+          fsm.setState(GatheringOffers,
+            GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
           fsm ! StateTimeout
           val offers = expectMsgType[AcceptOffers]
           offers.hostname() should be (slave1._2)
           offers.offerIds() should contain only (slave1._3.getId)
         }
         "transitions to Idle when task queue is empty" in new Context {
-          when(optimizer.scheduleOnce(MM.any(), MM.any())) thenAnswer scheduleOnce { (requests, newLeases) =>
-            val (lease, task) = (newLeases.head, requests.head)
-            schedulingResult(
-              successes = Seq(vmAssignmentResult(lease.hostname(), Seq(lease), Set(taskAssignmentResult(lease, task)))))
-          }
-          fsm.setState(GatheringOffers, GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
+          when(optimizer.scheduleOnce(MM.any(), MM.any())) thenAnswer {
+            scheduleOnce { (requests, newLeases) =>
+              val (l, task) = (newLeases.head, requests.head)
+              val vm = vmAssignmentResult(l.hostname(), Seq(l), Set(taskAssignmentResult(l, task)))
+              schedulingResult(successes = Seq(vm))
+            }
+          } thenReturn(schedulingResult(successes = Nil))
+
+          fsm.setState(GatheringOffers,
+            GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
           fsm ! StateTimeout
           fsm.stateName should be (Idle)
           fsm.stateData.tasks should be (empty)
           fsm.stateData.newLeases should be (empty)
         }
         "stays in GatheringOffers when task queue is non-empty" in new Context {
-          when(optimizer.scheduleOnce(MM.any(), MM.any())) thenAnswer scheduleOnce { (requests, newLeases) =>
-            schedulingResult(successes = Nil)
+          when(optimizer.scheduleOnce(MM.any(), MM.any())) thenAnswer {
+            scheduleOnce { (requests, newLeases) =>
+              schedulingResult(successes = Nil)
+            }
           }
-          fsm.setState(GatheringOffers, GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
+          fsm.setState(GatheringOffers,
+            GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
           fsm ! StateTimeout
           fsm.stateName should be (GatheringOffers)
           fsm.stateData.tasks should contain only (task1._2)
           fsm.stateData.newLeases should be (empty)
         }
         "declines old offers" in new Context {
-          when(optimizer.scheduleOnce(MM.any(), MM.any())) thenAnswer scheduleOnce { (requests, newLeases) =>
-            optimizerBuilder.leaseRejectAction.call(newLeases.head)
-            schedulingResult(successes = Nil)
-          }
-          fsm.setState(GatheringOffers, GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
+          when(optimizer.scheduleOnce(MM.any(), MM.any())) thenAnswer {
+            scheduleOnce { (requests, newLeases) =>
+              optimizerBuilder.leaseRejectAction.call(newLeases.head)
+              schedulingResult(successes = Nil)
+            }
+          } thenReturn(schedulingResult(successes = Nil))
+
+          fsm.setState(GatheringOffers,
+            GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
           fsm ! StateTimeout
           verify(schedulerDriver).declineOffer(slave1._3.getId)
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala
index fb64115..c223852 100644
--- a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala
+++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala
@@ -54,7 +54,8 @@ class ReconciliationCoordinatorTest
   def randomTask = {
     val slaveID = Protos.SlaveID.newBuilder.setValue(UUID.randomUUID.toString).build()
     val taskID = Protos.TaskID.newBuilder.setValue(UUID.randomUUID.toString).build
-    val taskStatus = Protos.TaskStatus.newBuilder().setTaskId(taskID).setSlaveId(slaveID).setState(TASK_STAGING).build()
+    val taskStatus = Protos.TaskStatus.newBuilder()
+      .setTaskId(taskID).setSlaveId(slaveID).setState(TASK_STAGING).build()
     (taskID, taskStatus)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala
index ff32116..b4ef938 100644
--- a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala
+++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala
@@ -60,7 +60,8 @@ class TaskMonitorTest
 
   def randomTask(slaveID: Protos.SlaveID) = {
     val taskID = Protos.TaskID.newBuilder.setValue(UUID.randomUUID.toString).build
-    val taskStatus = Protos.TaskStatus.newBuilder().setTaskId(taskID).setSlaveId(slaveID).setState(TASK_STAGING).build()
+    val taskStatus = Protos.TaskStatus.newBuilder()
+      .setTaskId(taskID).setSlaveId(slaveID).setState(TASK_STAGING).build()
     (taskID, taskStatus)
   }
 
@@ -70,7 +71,8 @@ class TaskMonitorTest
     val slave = randomSlave
     val task = randomTask(slave._1)
     val parent = TestProbe()
-    val fsm = TestFSMUtils.testFSMRef(new TaskMonitor(config, schedulerDriver, New(task._1)), parent.ref)
+    val fsm =
+      TestFSMUtils.testFSMRef(new TaskMonitor(config, schedulerDriver, New(task._1)), parent.ref)
     parent.watch(fsm)
   }
 
@@ -79,13 +81,13 @@ class TaskMonitorTest
 
   def handlesStatusUpdate(state: TaskMonitorState) = {
     "StatusUpdate" which {
-      "transitions to Staging when goal state is Launched and status is TASK_STAGING|TASK_STARTING" in new Context {
+      "transitions to Staging when goal state is Launched and status is staging" in new Context {
         fsm.setState(state, StateData(Launched(task._1, slave._1)))
         fsm ! new StatusUpdate(task._2.toBuilder.setState(TASK_STAGING).build())
         fsm.stateName should be (Staging)
         fsm.stateData should be (StateData(Launched(task._1, slave._1)))
       }
-      "transitions to Running when goal state is Launched and status is TASK_RUNNING" in new Context {
+      "transitions to Running when goal state is Launched and status is running" in new Context {
         fsm.setState(state, StateData(Launched(task._1, slave._1)))
         fsm ! new StatusUpdate(task._2.toBuilder.setState(TASK_RUNNING).build())
         fsm.stateName should be (Running)
@@ -100,13 +102,13 @@ class TaskMonitorTest
         }
         parent.expectTerminated(fsm)
       }
-      "transitions to Killing when goal state is Released and status is TASK_STAGING|TASK_STARTING|TASK_RUNNING" in new Context {
+      "transitions to Killing when goal state is Released and status is running" in new Context {
         fsm.setState(state, StateData(Released(task._1, slave._1)))
         fsm ! new StatusUpdate(task._2.toBuilder.setState(TASK_RUNNING).build())
         fsm.stateName should be (Killing)
         fsm.stateData should be (StateData(Released(task._1, slave._1)))
       }
-      "stops when goal state is Released and status is TASK_KILLED" in new Context {
+      "stops when goal state is Released and status is killed" in new Context {
         fsm.setState(state, StateData(Released(task._1, slave._1)))
         fsm ! new StatusUpdate(task._2.toBuilder.setState(TASK_KILLED).build())
         parent.fishForMessage() {

http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TasksTest.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TasksTest.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TasksTest.scala
new file mode 100644
index 0000000..fcf2977
--- /dev/null
+++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TasksTest.scala
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import java.util.UUID
+
+import akka.actor._
+import akka.testkit._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.mesos.TestFSMUtils
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile
+import org.apache.flink.mesos.scheduler.TaskMonitor._
+import org.apache.flink.mesos.scheduler.messages.{Connected, Disconnected, StatusUpdate}
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.mesos.Protos.TaskState._
+import org.apache.mesos.{Protos, SchedulerDriver}
+import org.junit.runner.RunWith
+import org.mockito.Mockito._
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+import scala.collection.mutable.{Map => MutableMap}
+
+@RunWith(classOf[JUnitRunner])
+class TasksTest
+    extends WordSpecLike
+    with Matchers
+    with BeforeAndAfterAll {
+
+  lazy val config = new Configuration()
+  implicit lazy val system = AkkaUtils.createLocalActorSystem(config)
+
+  override def afterAll(): Unit = {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  def randomSlave = {
+    val slaveID = Protos.SlaveID.newBuilder.setValue(UUID.randomUUID.toString).build
+    val hostname = s"host-${slaveID.getValue}"
+    (slaveID, hostname)
+  }
+
+  def randomTask(slaveID: Protos.SlaveID) = {
+    val taskID = Protos.TaskID.newBuilder.setValue(UUID.randomUUID.toString).build
+    val taskStatus = Protos.TaskStatus.newBuilder()
+      .setTaskId(taskID).setSlaveId(slaveID).setState(TASK_STAGING)
+    (taskID, taskStatus)
+  }
+
+  def childProbe(parent: ActorRefFactory): (TestProbe, ActorRef) = {
+    val probe = TestProbe()
+    val childRef = parent.actorOf(Props(
+      new Actor {
+        override def receive: Receive = {
+          case msg @ _ => probe.ref.forward(msg)
+        }
+      }
+    ))
+    (probe,childRef)
+  }
+
+  class Context(implicit val system: ActorSystem) extends TestKitBase with ImplicitSender {
+
+    case class MockTaskMonitor(probe: TestProbe, actorRef: ActorRef, task: TaskGoalState)
+
+    val schedulerDriver = mock(classOf[SchedulerDriver])
+
+    val slave = randomSlave
+    val task = randomTask(slave._1)
+
+    val taskActors = MutableMap[Protos.TaskID,MockTaskMonitor]()
+
+    val actor = {
+      val taskActorCreator = (factory: ActorRefFactory, task: TaskGoalState) => {
+        val (probe, taskActorRef) = childProbe(factory)
+        taskActors.put(task.taskID, MockTaskMonitor(probe, taskActorRef, task))
+        taskActorRef
+      }
+      TestActorRef[Tasks](
+        Props(classOf[Tasks], config, schedulerDriver, taskActorCreator),
+        testActor,
+        TestFSMUtils.randomName)
+    }
+  }
+
+  def handle = afterWord("handle")
+
+  "Tasks" should handle {
+
+    "(supervision)" which {
+      "escalates" in new Context {
+        actor ! TaskGoalStateUpdated(Launched(task._1, slave._1))
+        watch(actor)
+        taskActors(task._1).actorRef ! Kill
+        expectTerminated(actor)
+      }
+    }
+
+    "Connect" which {
+      "stores the connected message for later use" in new Context {
+        val msg = new Connected() {}
+        actor ! msg
+        actor.underlyingActor.registered should be (Some(msg))
+      }
+
+      "forwards the message to child tasks" in new Context {
+        val msg = new Connected() {}
+        actor ! TaskGoalStateUpdated(Launched(task._1, slave._1))
+        actor ! msg
+        taskActors(task._1).probe.expectMsg(msg)
+      }
+    }
+
+    "Disconnect" which {
+      "releases any connected message that was previously stored" in new Context {
+        actor.underlyingActor.registered = Some(new Connected() {})
+        actor ! new Disconnected()
+        actor.underlyingActor.registered should be (None)
+      }
+
+      "forwards the message to child tasks" in new Context {
+        val msg = new Disconnected() {}
+        actor ! TaskGoalStateUpdated(Launched(task._1, slave._1))
+        actor ! msg
+        taskActors(task._1).probe.expectMsg(msg)
+      }
+    }
+
+    "TaskGoalStateUpdated" which {
+      "creates a task monitor on-demand for a given task" in new Context {
+        val goal = Launched(task._1, slave._1)
+        actor ! TaskGoalStateUpdated(goal)
+        actor.underlyingActor.taskMap.contains(task._1) should be (true)
+        taskActors(task._1).task should be (goal)
+      }
+
+      "forwards the stored connected message to new monitor actors" in new Context {
+        val msg = new Connected() {}
+        val goal = Launched(task._1, slave._1)
+        actor ! msg
+        actor ! TaskGoalStateUpdated(goal)
+        taskActors(task._1).probe.expectMsg(msg)
+      }
+
+      "forwards the goal state to the task monitor" in new Context {
+        actor ! TaskGoalStateUpdated(Launched(task._1, slave._1))
+        val updateMsg = TaskGoalStateUpdated(Released(task._1, slave._1))
+        actor ! updateMsg
+        taskActors(task._1).probe.expectMsg(updateMsg)
+      }
+    }
+
+    "StatusUpdate" which {
+      "forwards the update to a task monitor" in new Context {
+        actor ! TaskGoalStateUpdated(Launched(task._1, slave._1))
+        val msg = new StatusUpdate(task._2.setState(TASK_RUNNING).build())
+        actor ! msg
+        taskActors(task._1).probe.expectMsg(msg)
+      }
+
+      "resumes monitoring of resurrected tasks" in new Context {
+        // in this scenario, no goal state is sent prior to the status update
+        actor ! new StatusUpdate(task._2.setState(TASK_RUNNING).build())
+        taskActors.contains(task._1) should be (true)
+        taskActors(task._1).task should be (Released(task._1, slave._1))
+      }
+    }
+
+    "Reconcile" which {
+      "forwards the message to the parent" in new Context {
+        val msg = new Reconcile(Seq(task._2.build()))
+        actor ! msg
+        expectMsg(msg)
+      }
+    }
+
+    "TaskTerminated" which {
+      "removes the task monitor ref" in new Context {
+        actor ! TaskGoalStateUpdated(Launched(task._1, slave._1))
+        actor.underlyingActor.taskMap.contains(task._1) should be (true)
+        actor ! TaskTerminated(task._1, task._2.setState(TASK_FAILED).build())
+        actor.underlyingActor.taskMap.contains(task._1) should be (false)
+      }
+
+      "forwards to the parent" in new Context {
+        actor ! TaskGoalStateUpdated(Launched(task._1, slave._1))
+        val msg = TaskTerminated(task._1, task._2.setState(TASK_FAILED).build())
+        actor ! msg
+        expectMsg(msg)
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-mesos/src/test/scala/org/apache/flink/runtime/akka/FSMSpec.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/scala/org/apache/flink/runtime/akka/FSMSpec.scala b/flink-mesos/src/test/scala/org/apache/flink/runtime/akka/FSMSpec.scala
index 3252f94..44a7fdc 100644
--- a/flink-mesos/src/test/scala/org/apache/flink/runtime/akka/FSMSpec.scala
+++ b/flink-mesos/src/test/scala/org/apache/flink/runtime/akka/FSMSpec.scala
@@ -32,7 +32,8 @@ abstract class FSMSpec extends FSMSpecLike {
 }
 
 /**
-  * Implementation trait for class <code>FSMSpec</code>, which extends wordspec with FSM functionality.
+  * Implementation trait for class <code>FSMSpec</code>, which extends wordspec
+  * with FSM functionality.
   *
   * For example: "MyFSM" when inState {
   *   "Connected" should handle {

http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
new file mode 100644
index 0000000..45b404a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework
+
+import java.util.concurrent.ExecutorService
+
+import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
+import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
+import org.apache.flink.runtime.clusterframework.messages._
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
+import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, JobNotFound, RequestJobStatus}
+import org.apache.flink.runtime.messages.Messages.Acknowledge
+import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+
+/** JobManager actor for execution on Yarn or Mesos. It enriches the [[JobManager]] with additional messages
+  * to start/administer/stop the session.
+  *
+  * @param flinkConfiguration Configuration object for the actor
+  * @param executorService Execution context which is used to execute concurrent tasks in the
+  *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+  * @param instanceManager Instance manager to manage the registered
+  *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
+  * @param scheduler Scheduler to schedule Flink jobs
+  * @param libraryCacheManager Manager to manage uploaded jar files
+  * @param archive Archive for finished Flink jobs
+  * @param restartStrategyFactory Restart strategy to be used in case of a job recovery
+  * @param timeout Timeout for futures
+  * @param leaderElectionService LeaderElectionService to participate in the leader election
+  */
+abstract class ContaineredJobManager(
+    flinkConfiguration: FlinkConfiguration,
+    executorService: ExecutorService,
+    instanceManager: InstanceManager,
+    scheduler: FlinkScheduler,
+    libraryCacheManager: BlobLibraryCacheManager,
+    archive: ActorRef,
+    restartStrategyFactory: RestartStrategyFactory,
+    timeout: FiniteDuration,
+    leaderElectionService: LeaderElectionService,
+    submittedJobGraphs : SubmittedJobGraphStore,
+    checkpointRecoveryFactory : CheckpointRecoveryFactory,
+    savepointStore: SavepointStore,
+    jobRecoveryTimeout: FiniteDuration,
+    metricsRegistry: Option[FlinkMetricRegistry])
+  extends JobManager(
+    flinkConfiguration,
+    executorService,
+    instanceManager,
+    scheduler,
+    libraryCacheManager,
+    archive,
+    restartStrategyFactory,
+    timeout,
+    leaderElectionService,
+    submittedJobGraphs,
+    checkpointRecoveryFactory,
+    savepointStore,
+    jobRecoveryTimeout,
+    metricsRegistry) {
+
+  val jobPollingInterval: FiniteDuration
+
+  // indicates if this JM has been started in a dedicated (per-job) mode.
+  var stopWhenJobFinished: JobID = null
+
+  override def handleMessage: Receive = {
+    handleContainerMessage orElse super.handleMessage
+  }
+
+  def handleContainerMessage: Receive = {
+
+    case msg @ (_: RegisterInfoMessageListener | _: UnRegisterInfoMessageListener) =>
+      // forward to ResourceManager
+      currentResourceManager match {
+        case Some(rm) =>
+          // we forward the message
+          rm.forward(decorateMessage(msg))
+        case None =>
+          // client has to try again
+      }
+
+    case msg: ShutdownClusterAfterJob =>
+      val jobId = msg.jobId()
+      log.info(s"ApplicationMaster will shut down session when job $jobId has finished.")
+      stopWhenJobFinished = jobId
+      // trigger regular job status messages (if this is a dedicated/per-job cluster)
+      if (stopWhenJobFinished != null) {
+        context.system.scheduler.schedule(0 seconds,
+          jobPollingInterval,
+          new Runnable {
+            override def run(): Unit = {
+              self ! decorateMessage(RequestJobStatus(stopWhenJobFinished))
+            }
+          }
+        )(context.dispatcher)
+      }
+
+      sender() ! decorateMessage(Acknowledge)
+
+    case msg: GetClusterStatus =>
+      sender() ! decorateMessage(
+        new GetClusterStatusResponse(
+          instanceManager.getNumberOfRegisteredTaskManagers,
+          instanceManager.getTotalNumberOfSlots)
+      )
+
+    case jnf: JobNotFound =>
+      log.debug(s"Job with ID ${jnf.jobID} not found in JobManager")
+      if (stopWhenJobFinished == null) {
+        log.warn("The ApplicationMaster didn't expect to receive this message")
+      }
+
+    case jobStatus: CurrentJobStatus =>
+      if (stopWhenJobFinished == null) {
+        log.warn(s"Received job status $jobStatus which wasn't requested.")
+      } else {
+        if (stopWhenJobFinished != jobStatus.jobID) {
+          log.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " +
+            s"job $stopWhenJobFinished")
+        } else {
+          if (jobStatus.status.isGloballyTerminalState) {
+            log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " +
+              s"Shutting down session")
+            if (jobStatus.status == JobStatus.FINISHED) {
+              self ! decorateMessage(
+                new StopCluster(
+                  ApplicationStatus.SUCCEEDED,
+                  s"The monitored job with ID ${jobStatus.jobID} has finished.")
+              )
+            } else {
+              self ! decorateMessage(
+                new StopCluster(
+                  ApplicationStatus.FAILED,
+                  s"The monitored job with ID ${jobStatus.jobID} has failed to complete.")
+              )
+            }
+          } else {
+            log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}")
+          }
+        }
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 87a2c98..4637b97 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -331,14 +331,6 @@ public class YarnApplicationMasterRunner {
 			// make sure that everything whatever ends up in the log
 			LOG.error("YARN Application Master initialization failed", t);
 
-			if (actorSystem != null) {
-				try {
-					actorSystem.shutdown();
-				} catch (Throwable tt) {
-					LOG.error("Error shutting down actor system", tt);
-				}
-			}
-
 			if (webMonitor != null) {
 				try {
 					webMonitor.stop();
@@ -347,6 +339,14 @@ public class YarnApplicationMasterRunner {
 				}
 			}
 
+			if (actorSystem != null) {
+				try {
+					actorSystem.shutdown();
+				} catch (Throwable tt) {
+					LOG.error("Error shutting down actor system", tt);
+				}
+			}
+
 			return INIT_ERROR_EXIT_CODE;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 94ad9f2..b9d52ae 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -18,26 +18,20 @@
 
 package org.apache.flink.yarn
 
-import java.util.concurrent.{TimeUnit, ExecutorService}
+import java.util.concurrent.{ExecutorService, TimeUnit}
 
 import akka.actor.ActorRef
-
-import org.apache.flink.api.common.JobID
-import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants}
+import org.apache.flink.configuration.{ConfigConstants, Configuration => FlinkConfiguration}
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
-import org.apache.flink.runtime.clusterframework.ApplicationStatus
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.clusterframework.messages._
-import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager}
-import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, CurrentJobStatus, JobNotFound}
-import org.apache.flink.runtime.messages.Messages.Acknowledge
-import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.clusterframework.ContaineredJobManager
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
+import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.metrics.MetricRegistry
 
 import scala.concurrent.duration._
 import scala.language.postfixOps
@@ -73,7 +67,7 @@ class YarnJobManager(
     savepointStore: SavepointStore,
     jobRecoveryTimeout: FiniteDuration,
     metricsRegistry: Option[MetricRegistry])
-  extends JobManager(
+  extends ContaineredJobManager(
     flinkConfiguration,
     executorService,
     instanceManager,
@@ -95,85 +89,5 @@ class YarnJobManager(
       flinkConfiguration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 5),
       TimeUnit.SECONDS)
 
-  // indicates if this AM has been started in a detached mode.
-  var stopWhenJobFinished: JobID = null
-
-  override def handleMessage: Receive = {
-    handleYarnMessage orElse super.handleMessage
-  }
-
-  def handleYarnMessage: Receive = {
-
-    case msg @ (_: RegisterInfoMessageListener | _: UnRegisterInfoMessageListener) =>
-      // forward to ResourceManager
-      currentResourceManager match {
-        case Some(rm) =>
-          // we forward the message
-          rm.forward(decorateMessage(msg))
-        case None =>
-          // client has to try again
-      }
-
-    case msg: ShutdownClusterAfterJob =>
-      val jobId = msg.jobId()
-      log.info(s"ApplicatonMaster will shut down YARN session when job $jobId has finished.")
-      stopWhenJobFinished = jobId
-      // trigger regular job status messages (if this is a per-job yarn cluster)
-      if (stopWhenJobFinished != null) {
-        context.system.scheduler.schedule(0 seconds,
-          YARN_HEARTBEAT_DELAY,
-          new Runnable {
-            override def run(): Unit = {
-              self ! decorateMessage(RequestJobStatus(stopWhenJobFinished))
-            }
-          }
-        )(context.dispatcher)
-      }
-
-      sender() ! decorateMessage(Acknowledge)
-
-    case msg: GetClusterStatus =>
-      sender() ! decorateMessage(
-        new GetClusterStatusResponse(
-          instanceManager.getNumberOfRegisteredTaskManagers,
-          instanceManager.getTotalNumberOfSlots)
-      )
-
-    case jnf: JobNotFound =>
-      log.debug(s"Job with ID ${jnf.jobID} not found in JobManager")
-      if (stopWhenJobFinished == null) {
-        log.warn("The ApplicationMaster didn't expect to receive this message")
-      }
-
-    case jobStatus: CurrentJobStatus =>
-      if (stopWhenJobFinished == null) {
-        log.warn(s"Received job status $jobStatus which wasn't requested.")
-      } else {
-        if (stopWhenJobFinished != jobStatus.jobID) {
-          log.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " +
-            s"job $stopWhenJobFinished")
-        } else {
-          if (jobStatus.status.isGloballyTerminalState) {
-            log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " +
-              s"Shutting down YARN session")
-            if (jobStatus.status == JobStatus.FINISHED) {
-              self ! decorateMessage(
-                new StopCluster(
-                  ApplicationStatus.SUCCEEDED,
-                  s"The monitored job with ID ${jobStatus.jobID} has finished.")
-              )
-            } else {
-              self ! decorateMessage(
-                new StopCluster(
-                  ApplicationStatus.FAILED,
-                  s"The monitored job with ID ${jobStatus.jobID} has failed to complete.")
-              )
-            }
-          } else {
-            log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}")
-          }
-        }
-      }
-  }
-
+  override val jobPollingInterval = YARN_HEARTBEAT_DELAY
 }