You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/05/02 03:37:59 UTC

spark git commit: [SPARK-7216] [MESOS] Add driver details page to Mesos cluster UI.

Repository: spark
Updated Branches:
  refs/heads/master 099327d53 -> 202219341


[SPARK-7216] [MESOS] Add driver details page to Mesos cluster UI.

Add a details page that displays Mesos driver in the Mesos cluster UI

Author: Timothy Chen <tn...@gmail.com>

Closes #5763 from tnachen/mesos_cluster_page and squashes the following commits:

55f36eb [Timothy Chen] Add driver details page to Mesos cluster UI.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20221934
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20221934
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20221934

Branch: refs/heads/master
Commit: 2022193412e832393a29b94609841c3ffe8e3d66
Parents: 099327d
Author: Timothy Chen <tn...@gmail.com>
Authored: Fri May 1 18:36:42 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Fri May 1 18:36:42 2015 -0700

----------------------------------------------------------------------
 .../spark/deploy/mesos/ui/DriverPage.scala      | 180 +++++++++++++++++++
 .../deploy/mesos/ui/MesosClusterPage.scala      |   9 +-
 .../spark/deploy/mesos/ui/MesosClusterUI.scala  |   1 +
 .../deploy/rest/mesos/MesosRestServer.scala     |   6 +-
 .../cluster/mesos/MesosClusterScheduler.scala   |  33 +++-
 .../cluster/mesos/MesosSchedulerBackend.scala   |   4 +-
 6 files changed, 222 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/20221934/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
new file mode 100644
index 0000000..be8560d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.deploy.Command
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.spark.scheduler.cluster.mesos.{MesosClusterSubmissionState, MesosClusterRetryState}
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+
+private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver") {
+
+  override def render(request: HttpServletRequest): Seq[Node] = {
+    val driverId = request.getParameter("id")
+    require(driverId != null && driverId.nonEmpty, "Missing id parameter")
+
+    val state = parent.scheduler.getDriverState(driverId)
+    if (state.isEmpty) {
+      val content =
+        <div>
+          <p>Cannot find driver {driverId}</p>
+        </div>
+      return UIUtils.basicSparkPage(content, s"Details for Job $driverId")
+    }
+    val driverState = state.get
+    val driverHeaders = Seq("Driver property", "Value")
+    val schedulerHeaders = Seq("Scheduler property", "Value")
+    val commandEnvHeaders = Seq("Command environment variable", "Value")
+    val launchedHeaders = Seq("Launched property", "Value")
+    val commandHeaders = Seq("Comamnd property", "Value")
+    val retryHeaders = Seq("Last failed status", "Next retry time", "Retry count")
+    val driverDescription = Iterable.apply(driverState.description)
+    val submissionState = Iterable.apply(driverState.submissionState)
+    val command = Iterable.apply(driverState.description.command)
+    val schedulerProperties = Iterable.apply(driverState.description.schedulerProperties)
+    val commandEnv = Iterable.apply(driverState.description.command.environment)
+    val driverTable =
+      UIUtils.listingTable(driverHeaders, driverRow, driverDescription)
+    val commandTable =
+      UIUtils.listingTable(commandHeaders, commandRow, command)
+    val commandEnvTable =
+      UIUtils.listingTable(commandEnvHeaders, propertiesRow, commandEnv)
+    val schedulerTable =
+      UIUtils.listingTable(schedulerHeaders, propertiesRow, schedulerProperties)
+    val launchedTable =
+      UIUtils.listingTable(launchedHeaders, launchedRow, submissionState)
+    val retryTable =
+      UIUtils.listingTable(
+        retryHeaders, retryRow, Iterable.apply(driverState.description.retryState))
+    val content =
+      <p>Driver state information for driver id {driverId}</p>
+        <a href="/">Back to Drivers</a>
+        <div class="row-fluid">
+          <div class="span12">
+            <h4>Driver state: {driverState.state}</h4>
+            <h4>Driver properties</h4>
+            {driverTable}
+            <h4>Driver command</h4>
+            {commandTable}
+            <h4>Driver command environment</h4>
+            {commandEnvTable}
+            <h4>Scheduler properties</h4>
+            {schedulerTable}
+            <h4>Launched state</h4>
+            {launchedTable}
+            <h4>Retry state</h4>
+            {retryTable}
+          </div>
+        </div>;
+
+    UIUtils.basicSparkPage(content, s"Details for Job $driverId")
+  }
+
+  private def launchedRow(submissionState: Option[MesosClusterSubmissionState]): Seq[Node] = {
+    submissionState.map { state =>
+      <tr>
+        <td>Mesos Slave ID</td>
+        <td>{state.slaveId.getValue}</td>
+      </tr>
+      <tr>
+        <td>Mesos Task ID</td>
+        <td>{state.taskId.getValue}</td>
+      </tr>
+      <tr>
+        <td>Launch Time</td>
+        <td>{state.startDate}</td>
+      </tr>
+      <tr>
+        <td>Finish Time</td>
+        <td>{state.finishDate.map(_.toString).getOrElse("")}</td>
+      </tr>
+      <tr>
+        <td>Last Task Status</td>
+        <td>{state.mesosTaskStatus.map(_.toString).getOrElse("")}</td>
+      </tr>
+    }.getOrElse(Seq[Node]())
+  }
+
+  private def propertiesRow(properties: collection.Map[String, String]): Seq[Node] = {
+    properties.map { case (k, v) =>
+      <tr>
+        <td>{k}</td><td>{v}</td>
+      </tr>
+    }.toSeq
+  }
+
+  private def commandRow(command: Command): Seq[Node] = {
+    <tr>
+      <td>Main class</td><td>{command.mainClass}</td>
+    </tr>
+    <tr>
+      <td>Arguments</td><td>{command.arguments.mkString(" ")}</td>
+    </tr>
+    <tr>
+      <td>Class path entries</td><td>{command.classPathEntries.mkString(" ")}</td>
+    </tr>
+    <tr>
+      <td>Java options</td><td>{command.javaOpts.mkString((" "))}</td>
+    </tr>
+    <tr>
+      <td>Library path entries</td><td>{command.libraryPathEntries.mkString((" "))}</td>
+    </tr>
+  }
+
+  private def driverRow(driver: MesosDriverDescription): Seq[Node] = {
+    <tr>
+      <td>Name</td><td>{driver.name}</td>
+    </tr>
+    <tr>
+      <td>Id</td><td>{driver.submissionId}</td>
+    </tr>
+    <tr>
+      <td>Cores</td><td>{driver.cores}</td>
+    </tr>
+    <tr>
+      <td>Memory</td><td>{driver.mem}</td>
+    </tr>
+    <tr>
+      <td>Submitted</td><td>{driver.submissionDate}</td>
+    </tr>
+    <tr>
+      <td>Supervise</td><td>{driver.supervise}</td>
+    </tr>
+  }
+
+  private def retryRow(retryState: Option[MesosClusterRetryState]): Seq[Node] = {
+    retryState.map { state =>
+      <tr>
+        <td>
+          {state.lastFailureStatus}
+        </td>
+        <td>
+          {state.nextRetry}
+        </td>
+        <td>
+          {state.retries}
+        </td>
+      </tr>
+    }.getOrElse(Seq[Node]())
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/20221934/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
index 7b2005e..7419fa9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
@@ -56,8 +56,9 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
   }
 
   private def queuedRow(submission: MesosDriverDescription): Seq[Node] = {
+    val id = submission.submissionId
     <tr>
-      <td>{submission.submissionId}</td>
+      <td><a href={s"driver?id=$id"}>{id}</a></td>
       <td>{submission.submissionDate}</td>
       <td>{submission.command.mainClass}</td>
       <td>cpus: {submission.cores}, mem: {submission.mem}</td>
@@ -65,8 +66,9 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
   }
 
   private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = {
+    val id = state.driverDescription.submissionId
     <tr>
-      <td>{state.driverDescription.submissionId}</td>
+      <td><a href={s"driver?id=$id"}>{id}</a></td>
       <td>{state.driverDescription.submissionDate}</td>
       <td>{state.driverDescription.command.mainClass}</td>
       <td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td>
@@ -77,8 +79,9 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
   }
 
   private def retryRow(submission: MesosDriverDescription): Seq[Node] = {
+    val id = submission.submissionId
     <tr>
-      <td>{submission.submissionId}</td>
+      <td><a href={s"driver?id=$id"}>{id}</a></td>
       <td>{submission.submissionDate}</td>
       <td>{submission.command.mainClass}</td>
       <td>{submission.retryState.get.lastFailureStatus}</td>

http://git-wip-us.apache.org/repos/asf/spark/blob/20221934/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
index 4865d46..3f69354 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
@@ -39,6 +39,7 @@ private[spark] class MesosClusterUI(
 
   override def initialize() {
     attachPage(new MesosClusterPage(this))
+    attachPage(new DriverPage(this))
     attachHandler(createStaticHandler(MesosClusterUI.STATIC_RESOURCE_DIR, "/static"))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/20221934/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
index fd17a98..8198296 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
@@ -53,7 +53,7 @@ private[spark] class MesosRestServer(
     new MesosStatusRequestServlet(scheduler, masterConf)
 }
 
-private[deploy] class MesosSubmitRequestServlet(
+private[mesos] class MesosSubmitRequestServlet(
     scheduler: MesosClusterScheduler,
     conf: SparkConf)
   extends SubmitRequestServlet {
@@ -139,7 +139,7 @@ private[deploy] class MesosSubmitRequestServlet(
   }
 }
 
-private[deploy] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
+private[mesos] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
   extends KillRequestServlet {
   protected override def handleKill(submissionId: String): KillSubmissionResponse = {
     val k = scheduler.killDriver(submissionId)
@@ -148,7 +148,7 @@ private[deploy] class MesosKillRequestServlet(scheduler: MesosClusterScheduler,
   }
 }
 
-private[deploy] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
+private[mesos] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
   extends StatusRequestServlet {
   protected override def handleStatus(submissionId: String): SubmissionStatusResponse = {
     val d = scheduler.getDriverStatus(submissionId)

http://git-wip-us.apache.org/repos/asf/spark/blob/20221934/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 0396e62..06f0e28 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -50,12 +50,13 @@ private[spark] class MesosClusterSubmissionState(
     val taskId: TaskID,
     val slaveId: SlaveID,
     var mesosTaskStatus: Option[TaskStatus],
-    var startDate: Date)
+    var startDate: Date,
+    var finishDate: Option[Date])
   extends Serializable {
 
   def copy(): MesosClusterSubmissionState = {
     new MesosClusterSubmissionState(
-      driverDescription, taskId, slaveId, mesosTaskStatus, startDate)
+      driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate)
   }
 }
 
@@ -96,6 +97,14 @@ private[spark] class MesosClusterSchedulerState(
     val pendingRetryDrivers: Iterable[MesosDriverDescription])
 
 /**
+ * The full state of a Mesos driver, that is being used to display driver information on the UI.
+ */
+private[spark] class MesosDriverState(
+    val state: String,
+    val description: MesosDriverDescription,
+    val submissionState: Option[MesosClusterSubmissionState] = None)
+
+/**
  * A Mesos scheduler that is responsible for launching submitted Spark drivers in cluster mode
  * as Mesos tasks in a Mesos cluster.
  * All drivers are launched asynchronously by the framework, which will eventually be launched
@@ -233,6 +242,22 @@ private[spark] class MesosClusterScheduler(
     s
   }
 
+  /**
+   * Gets the driver state to be displayed on the Web UI.
+   */
+  def getDriverState(submissionId: String): Option[MesosDriverState] = {
+    stateLock.synchronized {
+      queuedDrivers.find(_.submissionId.equals(submissionId))
+        .map(d => new MesosDriverState("QUEUED", d))
+        .orElse(launchedDrivers.get(submissionId)
+          .map(d => new MesosDriverState("RUNNING", d.driverDescription, Some(d))))
+        .orElse(finishedDrivers.find(_.driverDescription.submissionId.equals(submissionId))
+          .map(d => new MesosDriverState("FINISHED", d.driverDescription, Some(d))))
+        .orElse(pendingRetryDrivers.find(_.submissionId.equals(submissionId))
+          .map(d => new MesosDriverState("RETRYING", d)))
+    }
+  }
+
   private def isQueueFull(): Boolean = launchedDrivers.size >= queuedCapacity
 
   /**
@@ -439,7 +464,7 @@ private[spark] class MesosClusterScheduler(
         logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
           submission.submissionId)
         val newState = new MesosClusterSubmissionState(submission, taskId, offer.offer.getSlaveId,
-          None, new Date())
+          None, new Date(), None)
         launchedDrivers(submission.submissionId) = newState
         launchedDriversState.persist(submission.submissionId, newState)
         afterLaunchCallback(submission.submissionId)
@@ -534,6 +559,7 @@ private[spark] class MesosClusterScheduler(
         // Check if the driver is supervise enabled and can be relaunched.
         if (state.driverDescription.supervise && shouldRelaunch(status.getState)) {
           removeFromLaunchedDrivers(taskId)
+          state.finishDate = Some(new Date())
           val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState
           val (retries, waitTimeSec) = retryState
             .map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) }
@@ -546,6 +572,7 @@ private[spark] class MesosClusterScheduler(
           pendingRetryDriversState.persist(taskId, newDriverDescription)
         } else if (TaskState.isFinished(TaskState.fromMesos(status.getState))) {
           removeFromLaunchedDrivers(taskId)
+          state.finishDate = Some(new Date())
           if (finishedDrivers.size >= retainedDrivers) {
             val toRemove = math.max(retainedDrivers / 10, 1)
             finishedDrivers.trimStart(toRemove)

http://git-wip-us.apache.org/repos/asf/spark/blob/20221934/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 8346a24..86a7d0f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -23,7 +23,7 @@ import java.util.{ArrayList => JArrayList, Collections, List => JList}
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{HashMap, HashSet}
 
-import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
+import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _}
 import org.apache.mesos.protobuf.ByteString
 import org.apache.mesos.{Scheduler => MScheduler, _}
 import org.apache.spark.executor.MesosExecutorBackend
@@ -56,7 +56,7 @@ private[spark] class MesosSchedulerBackend(
 
   // The listener bus to publish executor added/removed events.
   val listenerBus = sc.listenerBus
-  
+
   private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1)
 
   @volatile var appId: String = _


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org