You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/10/14 00:12:56 UTC
spark git commit: [SPARK-11034][LAUNCHER][MESOS] Launcher: add
support for monitoring Mesos apps
Repository: spark
Updated Branches:
refs/heads/master 1bb8b7604 -> 06df34d35
[SPARK-11034][LAUNCHER][MESOS] Launcher: add support for monitoring Mesos apps
## What changes were proposed in this pull request?
Added Launcher support for monitoring Mesos apps in Client mode. SPARK-11033 can handle the support for Mesos/Cluster mode since the Standalone/Cluster and Mesos/Cluster modes use the same code at client side.
## How was this patch tested?
I verified it manually by running launcher application, able to launch, stop and kill the mesos applications and also can invoke other launcher API's.
Author: Devaraj K <de...@apache.org>
Closes #19385 from devaraj-kavali/SPARK-11034.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/06df34d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/06df34d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/06df34d3
Branch: refs/heads/master
Commit: 06df34d35ec088277445ef09cfb24bfe996f072e
Parents: 1bb8b76
Author: Devaraj K <de...@apache.org>
Authored: Fri Oct 13 17:12:50 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Fri Oct 13 17:12:50 2017 -0700
----------------------------------------------------------------------
.../MesosCoarseGrainedSchedulerBackend.scala | 28 ++++++++++++++++++--
1 file changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/06df34d3/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 80c0a04..603c980 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -32,6 +32,7 @@ import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskStat
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config
+import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcEndpointAddress
@@ -89,6 +90,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// Synchronization protected by stateLock
private[this] var stopCalled: Boolean = false
+ private val launcherBackend = new LauncherBackend() {
+ override protected def onStopRequest(): Unit = {
+ stopSchedulerBackend()
+ setState(SparkAppHandle.State.KILLED)
+ }
+ }
+
// If shuffle service is enabled, the Spark driver will register with the shuffle service.
// This is for cleaning up shuffle files reliably.
private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
@@ -182,6 +190,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
override def start() {
super.start()
+ if (sc.deployMode == "client") {
+ launcherBackend.connect()
+ }
val startedBefore = IdHelper.startedBefore.getAndSet(true)
val suffix = if (startedBefore) {
@@ -202,6 +213,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
)
+ launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
startScheduler(driver)
}
@@ -295,15 +307,21 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
this.mesosExternalShuffleClient.foreach(_.init(appId))
this.schedulerDriver = driver
markRegistered()
+ launcherBackend.setAppId(appId)
+ launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
override def sufficientResourcesRegistered(): Boolean = {
totalCoreCount.get >= maxCoresOption.getOrElse(0) * minRegisteredRatio
}
- override def disconnected(d: org.apache.mesos.SchedulerDriver) {}
+ override def disconnected(d: org.apache.mesos.SchedulerDriver) {
+ launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
+ }
- override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {}
+ override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {
+ launcherBackend.setState(SparkAppHandle.State.RUNNING)
+ }
/**
* Method called by Mesos to offer resources on slaves. We respond by launching an executor,
@@ -611,6 +629,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
override def stop() {
+ stopSchedulerBackend()
+ launcherBackend.setState(SparkAppHandle.State.FINISHED)
+ launcherBackend.close()
+ }
+
+ private def stopSchedulerBackend() {
// Make sure we're not launching tasks during shutdown
stateLock.synchronized {
if (stopCalled) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org