You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/10/14 00:12:56 UTC

spark git commit: [SPARK-11034][LAUNCHER][MESOS] Launcher: add support for monitoring Mesos apps

Repository: spark
Updated Branches:
  refs/heads/master 1bb8b7604 -> 06df34d35


[SPARK-11034][LAUNCHER][MESOS] Launcher: add support for monitoring Mesos apps

## What changes were proposed in this pull request?

Added Launcher support for monitoring Mesos apps in Client mode. SPARK-11033 can handle the support for Mesos/Cluster mode since the Standalone/Cluster and Mesos/Cluster modes use the same code at client side.

## How was this patch tested?

I verified it manually by running launcher application, able to launch, stop and kill the mesos applications and also can invoke other launcher API's.

Author: Devaraj K <de...@apache.org>

Closes #19385 from devaraj-kavali/SPARK-11034.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/06df34d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/06df34d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/06df34d3

Branch: refs/heads/master
Commit: 06df34d35ec088277445ef09cfb24bfe996f072e
Parents: 1bb8b76
Author: Devaraj K <de...@apache.org>
Authored: Fri Oct 13 17:12:50 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Fri Oct 13 17:12:50 2017 -0700

----------------------------------------------------------------------
 .../MesosCoarseGrainedSchedulerBackend.scala    | 28 ++++++++++++++++++--
 1 file changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/06df34d3/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 80c0a04..603c980 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -32,6 +32,7 @@ import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskStat
 import org.apache.spark.deploy.mesos.config._
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.config
+import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
 import org.apache.spark.rpc.RpcEndpointAddress
@@ -89,6 +90,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   // Synchronization protected by stateLock
   private[this] var stopCalled: Boolean = false
 
+  private val launcherBackend = new LauncherBackend() {
+    override protected def onStopRequest(): Unit = {
+      stopSchedulerBackend()
+      setState(SparkAppHandle.State.KILLED)
+    }
+  }
+
   // If shuffle service is enabled, the Spark driver will register with the shuffle service.
   // This is for cleaning up shuffle files reliably.
   private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
@@ -182,6 +190,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   override def start() {
     super.start()
 
+    if (sc.deployMode == "client") {
+      launcherBackend.connect()
+    }
     val startedBefore = IdHelper.startedBefore.getAndSet(true)
 
     val suffix = if (startedBefore) {
@@ -202,6 +213,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
     )
 
+    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
     startScheduler(driver)
   }
 
@@ -295,15 +307,21 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     this.mesosExternalShuffleClient.foreach(_.init(appId))
     this.schedulerDriver = driver
     markRegistered()
+    launcherBackend.setAppId(appId)
+    launcherBackend.setState(SparkAppHandle.State.RUNNING)
   }
 
   override def sufficientResourcesRegistered(): Boolean = {
     totalCoreCount.get >= maxCoresOption.getOrElse(0) * minRegisteredRatio
   }
 
-  override def disconnected(d: org.apache.mesos.SchedulerDriver) {}
+  override def disconnected(d: org.apache.mesos.SchedulerDriver) {
+    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
+  }
 
-  override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {}
+  override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {
+    launcherBackend.setState(SparkAppHandle.State.RUNNING)
+  }
 
   /**
    * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
@@ -611,6 +629,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   }
 
   override def stop() {
+    stopSchedulerBackend()
+    launcherBackend.setState(SparkAppHandle.State.FINISHED)
+    launcherBackend.close()
+  }
+
+  private def stopSchedulerBackend() {
     // Make sure we're not launching tasks during shutdown
     stateLock.synchronized {
       if (stopCalled) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org