You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/08/29 15:33:26 UTC
[6/9] flink git commit: [FLINK-1984] Mesos ResourceManager - T1
milstone (3)
http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-mesos/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala b/flink-mesos/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
deleted file mode 100644
index 49c86b5..0000000
--- a/flink-mesos/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework
-
-import java.util.concurrent.{TimeUnit, ExecutorService}
-
-import akka.actor.ActorRef
-
-import org.apache.flink.api.common.JobID
-import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants}
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
-import org.apache.flink.runtime.clusterframework.ApplicationStatus
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.clusterframework.messages._
-import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager}
-import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, CurrentJobStatus, JobNotFound}
-import org.apache.flink.runtime.messages.Messages.Acknowledge
-import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-
-/** JobManager actor for execution on Yarn or Mesos. It enriches the [[JobManager]] with additional messages
- * to start/administer/stop the session.
- *
- * @param flinkConfiguration Configuration object for the actor
- * @param executorService Execution context which is used to execute concurrent tasks in the
- * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
- * @param instanceManager Instance manager to manage the registered
- * [[org.apache.flink.runtime.taskmanager.TaskManager]]
- * @param scheduler Scheduler to schedule Flink jobs
- * @param libraryCacheManager Manager to manage uploaded jar files
- * @param archive Archive for finished Flink jobs
- * @param restartStrategyFactory Restart strategy to be used in case of a job recovery
- * @param timeout Timeout for futures
- * @param leaderElectionService LeaderElectionService to participate in the leader election
- */
-abstract class ContaineredJobManager(
- flinkConfiguration: FlinkConfiguration,
- executorService: ExecutorService,
- instanceManager: InstanceManager,
- scheduler: FlinkScheduler,
- libraryCacheManager: BlobLibraryCacheManager,
- archive: ActorRef,
- restartStrategyFactory: RestartStrategyFactory,
- timeout: FiniteDuration,
- leaderElectionService: LeaderElectionService,
- submittedJobGraphs : SubmittedJobGraphStore,
- checkpointRecoveryFactory : CheckpointRecoveryFactory,
- savepointStore: SavepointStore,
- jobRecoveryTimeout: FiniteDuration,
- metricsRegistry: Option[FlinkMetricRegistry])
- extends JobManager(
- flinkConfiguration,
- executorService,
- instanceManager,
- scheduler,
- libraryCacheManager,
- archive,
- restartStrategyFactory,
- timeout,
- leaderElectionService,
- submittedJobGraphs,
- checkpointRecoveryFactory,
- savepointStore,
- jobRecoveryTimeout,
- metricsRegistry) {
-
- val jobPollingInterval: FiniteDuration
-
- // indicates if this JM has been started in a dedicated (per-job) mode.
- var stopWhenJobFinished: JobID = null
-
- override def handleMessage: Receive = {
- handleContainerMessage orElse super.handleMessage
- }
-
- def handleContainerMessage: Receive = {
-
- case msg @ (_: RegisterInfoMessageListener | _: UnRegisterInfoMessageListener) =>
- // forward to ResourceManager
- currentResourceManager match {
- case Some(rm) =>
- // we forward the message
- rm.forward(decorateMessage(msg))
- case None =>
- // client has to try again
- }
-
- case msg: ShutdownClusterAfterJob =>
- val jobId = msg.jobId()
- log.info(s"ApplicationMaster will shut down session when job $jobId has finished.")
- stopWhenJobFinished = jobId
- // trigger regular job status messages (if this is a dedicated/per-job cluster)
- if (stopWhenJobFinished != null) {
- context.system.scheduler.schedule(0 seconds,
- jobPollingInterval,
- new Runnable {
- override def run(): Unit = {
- self ! decorateMessage(RequestJobStatus(stopWhenJobFinished))
- }
- }
- )(context.dispatcher)
- }
-
- sender() ! decorateMessage(Acknowledge)
-
- case msg: GetClusterStatus =>
- sender() ! decorateMessage(
- new GetClusterStatusResponse(
- instanceManager.getNumberOfRegisteredTaskManagers,
- instanceManager.getTotalNumberOfSlots)
- )
-
- case jnf: JobNotFound =>
- log.debug(s"Job with ID ${jnf.jobID} not found in JobManager")
- if (stopWhenJobFinished == null) {
- log.warn("The ApplicationMaster didn't expect to receive this message")
- }
-
- case jobStatus: CurrentJobStatus =>
- if (stopWhenJobFinished == null) {
- log.warn(s"Received job status $jobStatus which wasn't requested.")
- } else {
- if (stopWhenJobFinished != jobStatus.jobID) {
- log.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " +
- s"job $stopWhenJobFinished")
- } else {
- if (jobStatus.status.isTerminalState) {
- log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " +
- s"Shutting down session")
- if (jobStatus.status == JobStatus.FINISHED) {
- self ! decorateMessage(
- new StopCluster(
- ApplicationStatus.SUCCEEDED,
- s"The monitored job with ID ${jobStatus.jobID} has finished.")
- )
- } else {
- self ! decorateMessage(
- new StopCluster(
- ApplicationStatus.FAILED,
- s"The monitored job with ID ${jobStatus.jobID} has failed to complete.")
- )
- }
- } else {
- log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}")
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index 05fb033..f287e13 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -235,9 +235,9 @@ public class MesosFlinkResourceManagerTest {
protected void run() {
try {
// set the initial persistent state then initialize the RM
- MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newTask(task1);
- MesosWorkerStore.Worker worker2 = MesosWorkerStore.Worker.newTask(task2).launchTask(slave1, slave1host);
- MesosWorkerStore.Worker worker3 = MesosWorkerStore.Worker.newTask(task3).launchTask(slave1, slave1host).releaseTask();
+ MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1);
+ MesosWorkerStore.Worker worker2 = MesosWorkerStore.Worker.newWorker(task2).launchWorker(slave1, slave1host);
+ MesosWorkerStore.Worker worker3 = MesosWorkerStore.Worker.newWorker(task3).launchWorker(slave1, slave1host).releaseWorker();
when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
when(workerStore.recoverWorkers()).thenReturn(Arrays.asList(worker1, worker2, worker3));
initialize();
@@ -276,7 +276,7 @@ public class MesosFlinkResourceManagerTest {
protected void run() {
try {
// set the initial persistent state then initialize the RM
- MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newTask(task1).launchTask(slave1, slave1host);
+ MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched));
initialize();
@@ -306,7 +306,7 @@ public class MesosFlinkResourceManagerTest {
protected void run() {
try {
// set the initial state with a (recovered) launched worker
- MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newTask(task1).launchTask(slave1, slave1host);
+ MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched));
initialize();
@@ -339,7 +339,7 @@ public class MesosFlinkResourceManagerTest {
protected void run() {
try {
// set the initial persistent state, initialize the RM, then register with task1 as a registered worker
- MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newTask(task1).launchTask(slave1, slave1host);
+ MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1));
initialize();
@@ -351,12 +351,13 @@ public class MesosFlinkResourceManagerTest {
// verify that the worker was persisted, the internal state was updated, the task router was notified,
// and the launch coordinator was notified about the host assignment change
- MesosWorkerStore.Worker worker2Released = worker1.releaseTask();
+ MesosWorkerStore.Worker worker2Released = worker1.releaseWorker();
verify(workerStore).putWorker(worker2Released);
assertThat(resourceManagerInstance.workersBeingReturned, hasEntry(extractResourceID(task1), worker2Released));
resourceManagerInstance.launchCoordinator.expectMsg(new LaunchCoordinator.Unassign(task1, slave1host));
// send the subsequent terminated message
+ when(workerStore.removeWorker(task1)).thenReturn(true);
resourceManager.tell(new TaskMonitor.TaskTerminated(task1, Protos.TaskStatus.newBuilder()
.setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_FINISHED).build()));
@@ -391,7 +392,7 @@ public class MesosFlinkResourceManagerTest {
// verify that a new worker was persisted, the internal state was updated, the task router was notified,
// and the launch coordinator was asked to launch a task
- MesosWorkerStore.Worker expected = MesosWorkerStore.Worker.newTask(task1);
+ MesosWorkerStore.Worker expected = MesosWorkerStore.Worker.newWorker(task1);
verify(workerStore).putWorker(expected);
assertThat(resourceManagerInstance.workersInNew, hasEntry(extractResourceID(task1), expected));
resourceManagerInstance.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class);
@@ -438,7 +439,7 @@ public class MesosFlinkResourceManagerTest {
protected void run() {
try {
// set the initial persistent state with a new task then initialize the RM
- MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newTask(task1);
+ MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1);
when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1));
initialize();
@@ -455,7 +456,7 @@ public class MesosFlinkResourceManagerTest {
// verify that the worker was persisted, the internal state was updated,
// Mesos was asked to launch task1, and the task router was notified
- MesosWorkerStore.Worker worker1launched = worker1.launchTask(slave1, slave1host);
+ MesosWorkerStore.Worker worker1launched = worker1.launchWorker(slave1, slave1host);
verify(workerStore).putWorker(worker1launched);
assertThat(resourceManagerInstance.workersInNew.entrySet(), empty());
assertThat(resourceManagerInstance.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));
@@ -505,7 +506,7 @@ public class MesosFlinkResourceManagerTest {
protected void run() {
try {
// set the initial persistent state with a launched worker that hasn't yet registered
- MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newTask(task1).launchTask(slave1, slave1host);
+ MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched));
initialize();
@@ -513,6 +514,7 @@ public class MesosFlinkResourceManagerTest {
// tell the RM that a task failed (and prepare a replacement task)
when(workerStore.newTaskID()).thenReturn(task2);
+ when(workerStore.removeWorker(task1)).thenReturn(true);
resourceManager.tell(new SetWorkerPoolSize(1), jobManager);
resourceManager.tell(new TaskMonitor.TaskTerminated(task1, Protos.TaskStatus.newBuilder()
.setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_FAILED).build()));
@@ -540,7 +542,7 @@ public class MesosFlinkResourceManagerTest {
protected void run() {
try {
// set the initial persistent state with a launched & registered worker
- MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newTask(task1).launchTask(slave1, slave1host);
+ MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched));
initialize();
@@ -548,6 +550,7 @@ public class MesosFlinkResourceManagerTest {
// tell the RM that a task failed (and prepare a replacement task)
when(workerStore.newTaskID()).thenReturn(task2);
+ when(workerStore.removeWorker(task1)).thenReturn(true);
resourceManager.tell(new SetWorkerPoolSize(1), jobManager);
resourceManager.tell(new TaskMonitor.TaskTerminated(task1, Protos.TaskStatus.newBuilder()
.setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_FAILED).build()));
@@ -582,7 +585,7 @@ public class MesosFlinkResourceManagerTest {
// verify that the Mesos framework is shutdown
verify(schedulerDriver).stop(false);
- verify(workerStore).cleanup();
+ verify(workerStore).stop(true);
expectTerminated(resourceManager.actor());
}
catch(Exception ex) {
http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-mesos/src/test/scala/org/apache/flink/mesos/Utils.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/Utils.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/Utils.scala
index 3ab72cd..80186cf 100644
--- a/flink-mesos/src/test/scala/org/apache/flink/mesos/Utils.scala
+++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/Utils.scala
@@ -31,7 +31,8 @@ object Matchers {
def contentsMatch[T](plan: Seq[T]): java.util.Collection[T] = {
org.mockito.Matchers.argThat(new ArgumentMatcher[java.util.Collection[T]] {
override def matches(o: scala.Any): Boolean = o match {
- case actual: java.util.Collection[T] => actual.size() == plan.size && actual.containsAll(plan.asJava)
+ case actual: java.util.Collection[T] =>
+ actual.size() == plan.size && actual.containsAll(plan.asJava)
case _ => false
}
})
@@ -46,7 +47,8 @@ object TestFSMUtils {
"$" + akka.util.Helpers.base64(l)
}
- def testFSMRef[S, D, T <: Actor: ClassTag](factory: \u21d2 T, supervisor: ActorRef)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T] = {
+ def testFSMRef[S, D, T <: Actor: ClassTag](factory: \u21d2 T, supervisor: ActorRef)
+ (implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T] = {
new TestFSMRef(system, Props(factory), supervisor, TestFSMUtils.randomName)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
index be7d788..34c1f66 100644
--- a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
+++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
@@ -93,7 +93,9 @@ class LaunchCoordinatorTest
val task: LaunchableTask = new LaunchableTask() {
override def taskRequest: TaskRequest = generateTaskRequest
- override def launch(slaveId: SlaveID, taskAssignment: TaskAssignmentResult): Protos.TaskInfo = {
+ override def launch(
+ slaveId: SlaveID,
+ taskAssignment: TaskAssignmentResult): Protos.TaskInfo = {
Protos.TaskInfo.newBuilder
.setTaskId(taskID).setName(taskID.getValue)
.setCommand(Protos.CommandInfo.newBuilder.setValue("whoami"))
@@ -135,11 +137,12 @@ class LaunchCoordinatorTest
*/
def taskAssignmentResult(lease: VirtualMachineLease, task: TaskRequest): TaskAssignmentResult = {
val ports = lease.portRanges().get(0)
+ val assignedPorts = ports.getBeg to ports.getBeg + task.getPorts
val r = mock(classOf[TaskAssignmentResult])
when(r.getTaskId).thenReturn(task.getId)
when(r.getHostname).thenReturn(lease.hostname())
when(r.getAssignedPorts).thenReturn(
- (ports.getBeg to ports.getBeg + task.getPorts).toList.asJava.asInstanceOf[java.util.List[Integer]])
+ assignedPorts.toList.asJava.asInstanceOf[java.util.List[Integer]])
when(r.getRequest).thenReturn(task)
when(r.isSuccessful).thenReturn(true)
when(r.getFitness).thenReturn(1.0)
@@ -196,7 +199,8 @@ class LaunchCoordinatorTest
*/
def taskSchedulerBuilder(optimizer: TaskScheduler) = new TaskSchedulerBuilder {
var leaseRejectAction: Action1[VirtualMachineLease] = null
- override def withLeaseRejectAction(action: Action1[VirtualMachineLease]): TaskSchedulerBuilder = {
+ override def withLeaseRejectAction(
+ action: Action1[VirtualMachineLease]): TaskSchedulerBuilder = {
leaseRejectAction = action
this
}
@@ -225,7 +229,8 @@ class LaunchCoordinatorTest
val optimizerBuilder = taskSchedulerBuilder(optimizer)
val schedulerDriver = mock(classOf[SchedulerDriver])
val trace = Mockito.inOrder(schedulerDriver)
- val fsm = TestFSMRef(new LaunchCoordinator(testActor, config, schedulerDriver, optimizerBuilder))
+ val fsm =
+ TestFSMRef(new LaunchCoordinator(testActor, config, schedulerDriver, optimizerBuilder))
val framework = randomFramework
val task1 = randomTask
@@ -234,12 +239,14 @@ class LaunchCoordinatorTest
val slave1 = {
val slave = randomSlave
- (slave._1, slave._2, randomOffer(framework, slave), randomOffer(framework, slave), randomOffer(framework, slave))
+ (slave._1, slave._2,
+ randomOffer(framework, slave), randomOffer(framework, slave), randomOffer(framework, slave))
}
val slave2 = {
val slave = randomSlave
- (slave._1, slave._2, randomOffer(framework, slave), randomOffer(framework, slave), randomOffer(framework, slave))
+ (slave._1, slave._2,
+ randomOffer(framework, slave), randomOffer(framework, slave), randomOffer(framework, slave))
}
}
@@ -337,10 +344,10 @@ class LaunchCoordinatorTest
verify(schedulerDriver).suppressOffers()
}
"declines any outstanding offers" in new Context {
- //fsm.setState(GatheringOffers, GatherData(newOffers = Seq(new VMLeaseObject(slave1._3))))
fsm.setState(GatheringOffers, GatherData())
fsm ! new Disconnected()
verify(optimizer).expireAllLeases()
+ verify(optimizer).scheduleOnce(MM.any(), MM.any())
}
}
"Disconnected" which {
@@ -351,7 +358,8 @@ class LaunchCoordinatorTest
fsm.stateData.tasks should contain only (task1._2)
}
"transitions to Suspended with offer queue emptied" in new Context {
- fsm.setState(GatheringOffers, GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
+ fsm.setState(GatheringOffers,
+ GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
fsm ! new Disconnected()
fsm.stateName should be (Suspended)
fsm.stateData.newLeases should be (empty)
@@ -359,7 +367,8 @@ class LaunchCoordinatorTest
}
"Launch" which {
"stays in GatheringOffers with updated task queue" in new Context {
- fsm.setState(GatheringOffers, GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
+ fsm.setState(GatheringOffers,
+ GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
fsm ! Launch(Seq(task2._2).asJava)
fsm.stateName should be (GatheringOffers)
fsm.stateData.tasks should contain only (task1._2, task2._2)
@@ -368,16 +377,19 @@ class LaunchCoordinatorTest
}
"ResourceOffers" which {
"stays in GatheringOffers with offer queue updated" in new Context {
- fsm.setState(GatheringOffers, GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
+ fsm.setState(GatheringOffers,
+ GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
fsm ! new ResourceOffers(Seq(slave1._4, slave2._3).asJava)
fsm.stateName should be (GatheringOffers)
fsm.stateData.tasks should contain only (task1._2)
- fsm.stateData.newLeases.map(_.getOffer) should contain only (slave1._3, slave1._4, slave2._3)
+ fsm.stateData.newLeases.map(_.getOffer) should contain only
+ (slave1._3, slave1._4, slave2._3)
}
}
"OfferRescinded" which {
"stays in GatheringOffers with offer queue updated" in new Context {
- fsm.setState(GatheringOffers, GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
+ fsm.setState(GatheringOffers,
+ GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
fsm ! new OfferRescinded(slave1._3.getId)
verify(optimizer).expireLease(slave1._3.getId.getValue)
fsm.stateName should be (GatheringOffers)
@@ -387,45 +399,60 @@ class LaunchCoordinatorTest
}
"StateTimeout" which {
"sends AcceptOffers message for matched tasks" in new Context {
- when(optimizer.scheduleOnce(MM.any(), MM.any())) thenAnswer scheduleOnce { (requests, newLeases) =>
- val (lease, task) = (newLeases.head, requests.head)
- schedulingResult(
- successes = Seq(vmAssignmentResult(lease.hostname(), Seq(lease), Set(taskAssignmentResult(lease, task)))))
- }
- fsm.setState(GatheringOffers, GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
+ when(optimizer.scheduleOnce(MM.any(), MM.any())) thenAnswer {
+ scheduleOnce { (requests, newLeases) =>
+ val (l, task) = (newLeases.head, requests.head)
+ val vm = vmAssignmentResult(l.hostname(), Seq(l), Set(taskAssignmentResult(l, task)))
+ schedulingResult(successes = Seq(vm))
+ }
+ } thenReturn(schedulingResult(successes = Nil))
+
+ fsm.setState(GatheringOffers,
+ GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
fsm ! StateTimeout
val offers = expectMsgType[AcceptOffers]
offers.hostname() should be (slave1._2)
offers.offerIds() should contain only (slave1._3.getId)
}
"transitions to Idle when task queue is empty" in new Context {
- when(optimizer.scheduleOnce(MM.any(), MM.any())) thenAnswer scheduleOnce { (requests, newLeases) =>
- val (lease, task) = (newLeases.head, requests.head)
- schedulingResult(
- successes = Seq(vmAssignmentResult(lease.hostname(), Seq(lease), Set(taskAssignmentResult(lease, task)))))
- }
- fsm.setState(GatheringOffers, GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
+ when(optimizer.scheduleOnce(MM.any(), MM.any())) thenAnswer {
+ scheduleOnce { (requests, newLeases) =>
+ val (l, task) = (newLeases.head, requests.head)
+ val vm = vmAssignmentResult(l.hostname(), Seq(l), Set(taskAssignmentResult(l, task)))
+ schedulingResult(successes = Seq(vm))
+ }
+ } thenReturn(schedulingResult(successes = Nil))
+
+ fsm.setState(GatheringOffers,
+ GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
fsm ! StateTimeout
fsm.stateName should be (Idle)
fsm.stateData.tasks should be (empty)
fsm.stateData.newLeases should be (empty)
}
"stays in GatheringOffers when task queue is non-empty" in new Context {
- when(optimizer.scheduleOnce(MM.any(), MM.any())) thenAnswer scheduleOnce { (requests, newLeases) =>
- schedulingResult(successes = Nil)
+ when(optimizer.scheduleOnce(MM.any(), MM.any())) thenAnswer {
+ scheduleOnce { (requests, newLeases) =>
+ schedulingResult(successes = Nil)
+ }
}
- fsm.setState(GatheringOffers, GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
+ fsm.setState(GatheringOffers,
+ GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
fsm ! StateTimeout
fsm.stateName should be (GatheringOffers)
fsm.stateData.tasks should contain only (task1._2)
fsm.stateData.newLeases should be (empty)
}
"declines old offers" in new Context {
- when(optimizer.scheduleOnce(MM.any(), MM.any())) thenAnswer scheduleOnce { (requests, newLeases) =>
- optimizerBuilder.leaseRejectAction.call(newLeases.head)
- schedulingResult(successes = Nil)
- }
- fsm.setState(GatheringOffers, GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
+ when(optimizer.scheduleOnce(MM.any(), MM.any())) thenAnswer {
+ scheduleOnce { (requests, newLeases) =>
+ optimizerBuilder.leaseRejectAction.call(newLeases.head)
+ schedulingResult(successes = Nil)
+ }
+ } thenReturn(schedulingResult(successes = Nil))
+
+ fsm.setState(GatheringOffers,
+ GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
fsm ! StateTimeout
verify(schedulerDriver).declineOffer(slave1._3.getId)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala
index fb64115..c223852 100644
--- a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala
+++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala
@@ -54,7 +54,8 @@ class ReconciliationCoordinatorTest
def randomTask = {
val slaveID = Protos.SlaveID.newBuilder.setValue(UUID.randomUUID.toString).build()
val taskID = Protos.TaskID.newBuilder.setValue(UUID.randomUUID.toString).build
- val taskStatus = Protos.TaskStatus.newBuilder().setTaskId(taskID).setSlaveId(slaveID).setState(TASK_STAGING).build()
+ val taskStatus = Protos.TaskStatus.newBuilder()
+ .setTaskId(taskID).setSlaveId(slaveID).setState(TASK_STAGING).build()
(taskID, taskStatus)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala
index ff32116..b4ef938 100644
--- a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala
+++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala
@@ -60,7 +60,8 @@ class TaskMonitorTest
def randomTask(slaveID: Protos.SlaveID) = {
val taskID = Protos.TaskID.newBuilder.setValue(UUID.randomUUID.toString).build
- val taskStatus = Protos.TaskStatus.newBuilder().setTaskId(taskID).setSlaveId(slaveID).setState(TASK_STAGING).build()
+ val taskStatus = Protos.TaskStatus.newBuilder()
+ .setTaskId(taskID).setSlaveId(slaveID).setState(TASK_STAGING).build()
(taskID, taskStatus)
}
@@ -70,7 +71,8 @@ class TaskMonitorTest
val slave = randomSlave
val task = randomTask(slave._1)
val parent = TestProbe()
- val fsm = TestFSMUtils.testFSMRef(new TaskMonitor(config, schedulerDriver, New(task._1)), parent.ref)
+ val fsm =
+ TestFSMUtils.testFSMRef(new TaskMonitor(config, schedulerDriver, New(task._1)), parent.ref)
parent.watch(fsm)
}
@@ -79,13 +81,13 @@ class TaskMonitorTest
def handlesStatusUpdate(state: TaskMonitorState) = {
"StatusUpdate" which {
- "transitions to Staging when goal state is Launched and status is TASK_STAGING|TASK_STARTING" in new Context {
+ "transitions to Staging when goal state is Launched and status is staging" in new Context {
fsm.setState(state, StateData(Launched(task._1, slave._1)))
fsm ! new StatusUpdate(task._2.toBuilder.setState(TASK_STAGING).build())
fsm.stateName should be (Staging)
fsm.stateData should be (StateData(Launched(task._1, slave._1)))
}
- "transitions to Running when goal state is Launched and status is TASK_RUNNING" in new Context {
+ "transitions to Running when goal state is Launched and status is running" in new Context {
fsm.setState(state, StateData(Launched(task._1, slave._1)))
fsm ! new StatusUpdate(task._2.toBuilder.setState(TASK_RUNNING).build())
fsm.stateName should be (Running)
@@ -100,13 +102,13 @@ class TaskMonitorTest
}
parent.expectTerminated(fsm)
}
- "transitions to Killing when goal state is Released and status is TASK_STAGING|TASK_STARTING|TASK_RUNNING" in new Context {
+ "transitions to Killing when goal state is Released and status is running" in new Context {
fsm.setState(state, StateData(Released(task._1, slave._1)))
fsm ! new StatusUpdate(task._2.toBuilder.setState(TASK_RUNNING).build())
fsm.stateName should be (Killing)
fsm.stateData should be (StateData(Released(task._1, slave._1)))
}
- "stops when goal state is Released and status is TASK_KILLED" in new Context {
+ "stops when goal state is Released and status is killed" in new Context {
fsm.setState(state, StateData(Released(task._1, slave._1)))
fsm ! new StatusUpdate(task._2.toBuilder.setState(TASK_KILLED).build())
parent.fishForMessage() {
http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TasksTest.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TasksTest.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TasksTest.scala
new file mode 100644
index 0000000..fcf2977
--- /dev/null
+++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TasksTest.scala
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import java.util.UUID
+
+import akka.actor._
+import akka.testkit._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.mesos.TestFSMUtils
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile
+import org.apache.flink.mesos.scheduler.TaskMonitor._
+import org.apache.flink.mesos.scheduler.messages.{Connected, Disconnected, StatusUpdate}
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.mesos.Protos.TaskState._
+import org.apache.mesos.{Protos, SchedulerDriver}
+import org.junit.runner.RunWith
+import org.mockito.Mockito._
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+import scala.collection.mutable.{Map => MutableMap}
+
+@RunWith(classOf[JUnitRunner])
+class TasksTest
+ extends WordSpecLike
+ with Matchers
+ with BeforeAndAfterAll {
+
+ lazy val config = new Configuration()
+ implicit lazy val system = AkkaUtils.createLocalActorSystem(config)
+
+ override def afterAll(): Unit = {
+ TestKit.shutdownActorSystem(system)
+ }
+
+ def randomSlave = {
+ val slaveID = Protos.SlaveID.newBuilder.setValue(UUID.randomUUID.toString).build
+ val hostname = s"host-${slaveID.getValue}"
+ (slaveID, hostname)
+ }
+
+ def randomTask(slaveID: Protos.SlaveID) = {
+ val taskID = Protos.TaskID.newBuilder.setValue(UUID.randomUUID.toString).build
+ val taskStatus = Protos.TaskStatus.newBuilder()
+ .setTaskId(taskID).setSlaveId(slaveID).setState(TASK_STAGING)
+ (taskID, taskStatus)
+ }
+
+ def childProbe(parent: ActorRefFactory): (TestProbe, ActorRef) = {
+ val probe = TestProbe()
+ val childRef = parent.actorOf(Props(
+ new Actor {
+ override def receive: Receive = {
+ case msg @ _ => probe.ref.forward(msg)
+ }
+ }
+ ))
+ (probe,childRef)
+ }
+
+ class Context(implicit val system: ActorSystem) extends TestKitBase with ImplicitSender {
+
+ case class MockTaskMonitor(probe: TestProbe, actorRef: ActorRef, task: TaskGoalState)
+
+ val schedulerDriver = mock(classOf[SchedulerDriver])
+
+ val slave = randomSlave
+ val task = randomTask(slave._1)
+
+ val taskActors = MutableMap[Protos.TaskID,MockTaskMonitor]()
+
+ val actor = {
+ val taskActorCreator = (factory: ActorRefFactory, task: TaskGoalState) => {
+ val (probe, taskActorRef) = childProbe(factory)
+ taskActors.put(task.taskID, MockTaskMonitor(probe, taskActorRef, task))
+ taskActorRef
+ }
+ TestActorRef[Tasks](
+ Props(classOf[Tasks], config, schedulerDriver, taskActorCreator),
+ testActor,
+ TestFSMUtils.randomName)
+ }
+ }
+
+ def handle = afterWord("handle")
+
+ "Tasks" should handle {
+
+ "(supervision)" which {
+ "escalates" in new Context {
+ actor ! TaskGoalStateUpdated(Launched(task._1, slave._1))
+ watch(actor)
+ taskActors(task._1).actorRef ! Kill
+ expectTerminated(actor)
+ }
+ }
+
+ "Connect" which {
+ "stores the connected message for later use" in new Context {
+ val msg = new Connected() {}
+ actor ! msg
+ actor.underlyingActor.registered should be (Some(msg))
+ }
+
+ "forwards the message to child tasks" in new Context {
+ val msg = new Connected() {}
+ actor ! TaskGoalStateUpdated(Launched(task._1, slave._1))
+ actor ! msg
+ taskActors(task._1).probe.expectMsg(msg)
+ }
+ }
+
+ "Disconnect" which {
+ "releases any connected message that was previously stored" in new Context {
+ actor.underlyingActor.registered = Some(new Connected() {})
+ actor ! new Disconnected()
+ actor.underlyingActor.registered should be (None)
+ }
+
+ "forwards the message to child tasks" in new Context {
+ val msg = new Disconnected() {}
+ actor ! TaskGoalStateUpdated(Launched(task._1, slave._1))
+ actor ! msg
+ taskActors(task._1).probe.expectMsg(msg)
+ }
+ }
+
+ "TaskGoalStateUpdated" which {
+ "creates a task monitor on-demand for a given task" in new Context {
+ val goal = Launched(task._1, slave._1)
+ actor ! TaskGoalStateUpdated(goal)
+ actor.underlyingActor.taskMap.contains(task._1) should be (true)
+ taskActors(task._1).task should be (goal)
+ }
+
+ "forwards the stored connected message to new monitor actors" in new Context {
+ val msg = new Connected() {}
+ val goal = Launched(task._1, slave._1)
+ actor ! msg
+ actor ! TaskGoalStateUpdated(goal)
+ taskActors(task._1).probe.expectMsg(msg)
+ }
+
+ "forwards the goal state to the task monitor" in new Context {
+ actor ! TaskGoalStateUpdated(Launched(task._1, slave._1))
+ val updateMsg = TaskGoalStateUpdated(Released(task._1, slave._1))
+ actor ! updateMsg
+ taskActors(task._1).probe.expectMsg(updateMsg)
+ }
+ }
+
+ "StatusUpdate" which {
+ "forwards the update to a task monitor" in new Context {
+ actor ! TaskGoalStateUpdated(Launched(task._1, slave._1))
+ val msg = new StatusUpdate(task._2.setState(TASK_RUNNING).build())
+ actor ! msg
+ taskActors(task._1).probe.expectMsg(msg)
+ }
+
+ "resumes monitoring of resurrected tasks" in new Context {
+ // in this scenario, no goal state is sent prior to the status update
+ actor ! new StatusUpdate(task._2.setState(TASK_RUNNING).build())
+ taskActors.contains(task._1) should be (true)
+ taskActors(task._1).task should be (Released(task._1, slave._1))
+ }
+ }
+
+ "Reconcile" which {
+ "forwards the message to the parent" in new Context {
+ val msg = new Reconcile(Seq(task._2.build()))
+ actor ! msg
+ expectMsg(msg)
+ }
+ }
+
+ "TaskTerminated" which {
+ "removes the task monitor ref" in new Context {
+ actor ! TaskGoalStateUpdated(Launched(task._1, slave._1))
+ actor.underlyingActor.taskMap.contains(task._1) should be (true)
+ actor ! TaskTerminated(task._1, task._2.setState(TASK_FAILED).build())
+ actor.underlyingActor.taskMap.contains(task._1) should be (false)
+ }
+
+ "forwards to the parent" in new Context {
+ actor ! TaskGoalStateUpdated(Launched(task._1, slave._1))
+ val msg = TaskTerminated(task._1, task._2.setState(TASK_FAILED).build())
+ actor ! msg
+ expectMsg(msg)
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-mesos/src/test/scala/org/apache/flink/runtime/akka/FSMSpec.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/scala/org/apache/flink/runtime/akka/FSMSpec.scala b/flink-mesos/src/test/scala/org/apache/flink/runtime/akka/FSMSpec.scala
index 3252f94..44a7fdc 100644
--- a/flink-mesos/src/test/scala/org/apache/flink/runtime/akka/FSMSpec.scala
+++ b/flink-mesos/src/test/scala/org/apache/flink/runtime/akka/FSMSpec.scala
@@ -32,7 +32,8 @@ abstract class FSMSpec extends FSMSpecLike {
}
/**
- * Implementation trait for class <code>FSMSpec</code>, which extends wordspec with FSM functionality.
+ * Implementation trait for class <code>FSMSpec</code>, which extends wordspec
+ * with FSM functionality.
*
* For example: "MyFSM" when inState {
* "Connected" should handle {
http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
new file mode 100644
index 0000000..45b404a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework
+
+import java.util.concurrent.ExecutorService
+
+import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
+import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
+import org.apache.flink.runtime.clusterframework.messages._
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
+import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, JobNotFound, RequestJobStatus}
+import org.apache.flink.runtime.messages.Messages.Acknowledge
+import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+
+/** JobManager actor for execution on Yarn or Mesos. It enriches the [[JobManager]] with additional messages
+ * to start/administer/stop the session.
+ *
+ * @param flinkConfiguration Configuration object for the actor
+ * @param executorService Execution context which is used to execute concurrent tasks in the
+ * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+ * @param instanceManager Instance manager to manage the registered
+ * [[org.apache.flink.runtime.taskmanager.TaskManager]]
+ * @param scheduler Scheduler to schedule Flink jobs
+ * @param libraryCacheManager Manager to manage uploaded jar files
+ * @param archive Archive for finished Flink jobs
+ * @param restartStrategyFactory Restart strategy to be used in case of a job recovery
+ * @param timeout Timeout for futures
+ * @param leaderElectionService LeaderElectionService to participate in the leader election
+ */
+abstract class ContaineredJobManager(
+ flinkConfiguration: FlinkConfiguration,
+ executorService: ExecutorService,
+ instanceManager: InstanceManager,
+ scheduler: FlinkScheduler,
+ libraryCacheManager: BlobLibraryCacheManager,
+ archive: ActorRef,
+ restartStrategyFactory: RestartStrategyFactory,
+ timeout: FiniteDuration,
+ leaderElectionService: LeaderElectionService,
+ submittedJobGraphs : SubmittedJobGraphStore,
+ checkpointRecoveryFactory : CheckpointRecoveryFactory,
+ savepointStore: SavepointStore,
+ jobRecoveryTimeout: FiniteDuration,
+ metricsRegistry: Option[FlinkMetricRegistry])
+ extends JobManager(
+ flinkConfiguration,
+ executorService,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archive,
+ restartStrategyFactory,
+ timeout,
+ leaderElectionService,
+ submittedJobGraphs,
+ checkpointRecoveryFactory,
+ savepointStore,
+ jobRecoveryTimeout,
+ metricsRegistry) {
+
+ val jobPollingInterval: FiniteDuration
+
+ // indicates if this JM has been started in a dedicated (per-job) mode.
+ var stopWhenJobFinished: JobID = null
+
+ override def handleMessage: Receive = {
+ handleContainerMessage orElse super.handleMessage
+ }
+
+ def handleContainerMessage: Receive = {
+
+ case msg @ (_: RegisterInfoMessageListener | _: UnRegisterInfoMessageListener) =>
+ // forward to ResourceManager
+ currentResourceManager match {
+ case Some(rm) =>
+ // we forward the message
+ rm.forward(decorateMessage(msg))
+ case None =>
+ // client has to try again
+ }
+
+ case msg: ShutdownClusterAfterJob =>
+ val jobId = msg.jobId()
+ log.info(s"ApplicationMaster will shut down session when job $jobId has finished.")
+ stopWhenJobFinished = jobId
+ // trigger regular job status messages (if this is a dedicated/per-job cluster)
+ if (stopWhenJobFinished != null) {
+ context.system.scheduler.schedule(0 seconds,
+ jobPollingInterval,
+ new Runnable {
+ override def run(): Unit = {
+ self ! decorateMessage(RequestJobStatus(stopWhenJobFinished))
+ }
+ }
+ )(context.dispatcher)
+ }
+
+ sender() ! decorateMessage(Acknowledge)
+
+ case msg: GetClusterStatus =>
+ sender() ! decorateMessage(
+ new GetClusterStatusResponse(
+ instanceManager.getNumberOfRegisteredTaskManagers,
+ instanceManager.getTotalNumberOfSlots)
+ )
+
+ case jnf: JobNotFound =>
+ log.debug(s"Job with ID ${jnf.jobID} not found in JobManager")
+ if (stopWhenJobFinished == null) {
+ log.warn("The ApplicationMaster didn't expect to receive this message")
+ }
+
+ case jobStatus: CurrentJobStatus =>
+ if (stopWhenJobFinished == null) {
+ log.warn(s"Received job status $jobStatus which wasn't requested.")
+ } else {
+ if (stopWhenJobFinished != jobStatus.jobID) {
+ log.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " +
+ s"job $stopWhenJobFinished")
+ } else {
+ if (jobStatus.status.isGloballyTerminalState) {
+ log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " +
+ s"Shutting down session")
+ if (jobStatus.status == JobStatus.FINISHED) {
+ self ! decorateMessage(
+ new StopCluster(
+ ApplicationStatus.SUCCEEDED,
+ s"The monitored job with ID ${jobStatus.jobID} has finished.")
+ )
+ } else {
+ self ! decorateMessage(
+ new StopCluster(
+ ApplicationStatus.FAILED,
+ s"The monitored job with ID ${jobStatus.jobID} has failed to complete.")
+ )
+ }
+ } else {
+ log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}")
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 87a2c98..4637b97 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -331,14 +331,6 @@ public class YarnApplicationMasterRunner {
// make sure that everything whatever ends up in the log
LOG.error("YARN Application Master initialization failed", t);
- if (actorSystem != null) {
- try {
- actorSystem.shutdown();
- } catch (Throwable tt) {
- LOG.error("Error shutting down actor system", tt);
- }
- }
-
if (webMonitor != null) {
try {
webMonitor.stop();
@@ -347,6 +339,14 @@ public class YarnApplicationMasterRunner {
}
}
+ if (actorSystem != null) {
+ try {
+ actorSystem.shutdown();
+ } catch (Throwable tt) {
+ LOG.error("Error shutting down actor system", tt);
+ }
+ }
+
return INIT_ERROR_EXIT_CODE;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59eeea4c/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 94ad9f2..b9d52ae 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -18,26 +18,20 @@
package org.apache.flink.yarn
-import java.util.concurrent.{TimeUnit, ExecutorService}
+import java.util.concurrent.{ExecutorService, TimeUnit}
import akka.actor.ActorRef
-
-import org.apache.flink.api.common.JobID
-import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants}
+import org.apache.flink.configuration.{ConfigConstants, Configuration => FlinkConfiguration}
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
-import org.apache.flink.runtime.clusterframework.ApplicationStatus
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.clusterframework.messages._
-import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager}
-import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, CurrentJobStatus, JobNotFound}
-import org.apache.flink.runtime.messages.Messages.Acknowledge
-import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.clusterframework.ContaineredJobManager
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
import org.apache.flink.runtime.instance.InstanceManager
import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
+import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.metrics.MetricRegistry
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -73,7 +67,7 @@ class YarnJobManager(
savepointStore: SavepointStore,
jobRecoveryTimeout: FiniteDuration,
metricsRegistry: Option[MetricRegistry])
- extends JobManager(
+ extends ContaineredJobManager(
flinkConfiguration,
executorService,
instanceManager,
@@ -95,85 +89,5 @@ class YarnJobManager(
flinkConfiguration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 5),
TimeUnit.SECONDS)
- // indicates if this AM has been started in a detached mode.
- var stopWhenJobFinished: JobID = null
-
- override def handleMessage: Receive = {
- handleYarnMessage orElse super.handleMessage
- }
-
- def handleYarnMessage: Receive = {
-
- case msg @ (_: RegisterInfoMessageListener | _: UnRegisterInfoMessageListener) =>
- // forward to ResourceManager
- currentResourceManager match {
- case Some(rm) =>
- // we forward the message
- rm.forward(decorateMessage(msg))
- case None =>
- // client has to try again
- }
-
- case msg: ShutdownClusterAfterJob =>
- val jobId = msg.jobId()
- log.info(s"ApplicatonMaster will shut down YARN session when job $jobId has finished.")
- stopWhenJobFinished = jobId
- // trigger regular job status messages (if this is a per-job yarn cluster)
- if (stopWhenJobFinished != null) {
- context.system.scheduler.schedule(0 seconds,
- YARN_HEARTBEAT_DELAY,
- new Runnable {
- override def run(): Unit = {
- self ! decorateMessage(RequestJobStatus(stopWhenJobFinished))
- }
- }
- )(context.dispatcher)
- }
-
- sender() ! decorateMessage(Acknowledge)
-
- case msg: GetClusterStatus =>
- sender() ! decorateMessage(
- new GetClusterStatusResponse(
- instanceManager.getNumberOfRegisteredTaskManagers,
- instanceManager.getTotalNumberOfSlots)
- )
-
- case jnf: JobNotFound =>
- log.debug(s"Job with ID ${jnf.jobID} not found in JobManager")
- if (stopWhenJobFinished == null) {
- log.warn("The ApplicationMaster didn't expect to receive this message")
- }
-
- case jobStatus: CurrentJobStatus =>
- if (stopWhenJobFinished == null) {
- log.warn(s"Received job status $jobStatus which wasn't requested.")
- } else {
- if (stopWhenJobFinished != jobStatus.jobID) {
- log.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " +
- s"job $stopWhenJobFinished")
- } else {
- if (jobStatus.status.isGloballyTerminalState) {
- log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " +
- s"Shutting down YARN session")
- if (jobStatus.status == JobStatus.FINISHED) {
- self ! decorateMessage(
- new StopCluster(
- ApplicationStatus.SUCCEEDED,
- s"The monitored job with ID ${jobStatus.jobID} has finished.")
- )
- } else {
- self ! decorateMessage(
- new StopCluster(
- ApplicationStatus.FAILED,
- s"The monitored job with ID ${jobStatus.jobID} has failed to complete.")
- )
- }
- } else {
- log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}")
- }
- }
- }
- }
-
+ override val jobPollingInterval = YARN_HEARTBEAT_DELAY
}