You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2017/11/14 16:34:41 UTC

[4/5] spark git commit: [SPARK-20648][CORE] Port JobsTab and StageTab to the new UI backend.

http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
index dc5b03c..e4cf99e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
@@ -22,120 +22,121 @@ import javax.servlet.http.HttpServletRequest
 import scala.xml.{Node, NodeSeq}
 
 import org.apache.spark.scheduler.Schedulable
+import org.apache.spark.status.PoolData
+import org.apache.spark.status.api.v1._
 import org.apache.spark.ui.{UIUtils, WebUIPage}
 
 /** Page showing list of all ongoing and recently finished stages and pools */
 private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
   private val sc = parent.sc
-  private val listener = parent.progressListener
   private def isFairScheduler = parent.isFairScheduler
 
   def render(request: HttpServletRequest): Seq[Node] = {
-    listener.synchronized {
-      val activeStages = listener.activeStages.values.toSeq
-      val pendingStages = listener.pendingStages.values.toSeq
-      val completedStages = listener.completedStages.reverse
-      val numCompletedStages = listener.numCompletedStages
-      val failedStages = listener.failedStages.reverse
-      val numFailedStages = listener.numFailedStages
-      val subPath = "stages"
+    val allStages = parent.store.stageList(null)
 
-      val activeStagesTable =
-        new StageTableBase(request, activeStages, "active", "activeStage", parent.basePath, subPath,
-          parent.progressListener, parent.isFairScheduler,
-          killEnabled = parent.killEnabled, isFailedStage = false)
-      val pendingStagesTable =
-        new StageTableBase(request, pendingStages, "pending", "pendingStage", parent.basePath,
-          subPath, parent.progressListener, parent.isFairScheduler,
-          killEnabled = false, isFailedStage = false)
-      val completedStagesTable =
-        new StageTableBase(request, completedStages, "completed", "completedStage", parent.basePath,
-          subPath, parent.progressListener, parent.isFairScheduler,
-          killEnabled = false, isFailedStage = false)
-      val failedStagesTable =
-        new StageTableBase(request, failedStages, "failed", "failedStage", parent.basePath, subPath,
-          parent.progressListener, parent.isFairScheduler,
-          killEnabled = false, isFailedStage = true)
+    val activeStages = allStages.filter(_.status == StageStatus.ACTIVE)
+    val pendingStages = allStages.filter(_.status == StageStatus.PENDING)
+    val completedStages = allStages.filter(_.status == StageStatus.COMPLETE)
+    val failedStages = allStages.filter(_.status == StageStatus.FAILED).reverse
 
-      // For now, pool information is only accessible in live UIs
-      val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable])
-      val poolTable = new PoolTable(pools, parent)
+    val numCompletedStages = completedStages.size
+    val numFailedStages = failedStages.size
+    val subPath = "stages"
 
-      val shouldShowActiveStages = activeStages.nonEmpty
-      val shouldShowPendingStages = pendingStages.nonEmpty
-      val shouldShowCompletedStages = completedStages.nonEmpty
-      val shouldShowFailedStages = failedStages.nonEmpty
+    val activeStagesTable =
+      new StageTableBase(parent.store, request, activeStages, "active", "activeStage",
+        parent.basePath, subPath, parent.isFairScheduler, parent.killEnabled, false)
+    val pendingStagesTable =
+      new StageTableBase(parent.store, request, pendingStages, "pending", "pendingStage",
+        parent.basePath, subPath, parent.isFairScheduler, false, false)
+    val completedStagesTable =
+      new StageTableBase(parent.store, request, completedStages, "completed", "completedStage",
+        parent.basePath, subPath, parent.isFairScheduler, false, false)
+    val failedStagesTable =
+      new StageTableBase(parent.store, request, failedStages, "failed", "failedStage",
+        parent.basePath, subPath, parent.isFairScheduler, false, true)
 
-      val completedStageNumStr = if (numCompletedStages == completedStages.size) {
-        s"$numCompletedStages"
-      } else {
-        s"$numCompletedStages, only showing ${completedStages.size}"
-      }
+    // For now, pool information is only accessible in live UIs
+    val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable]).map { pool =>
+      val uiPool = parent.store.asOption(parent.store.pool(pool.name)).getOrElse(
+        new PoolData(pool.name, Set()))
+      pool -> uiPool
+    }.toMap
+    val poolTable = new PoolTable(pools, parent)
+
+    val shouldShowActiveStages = activeStages.nonEmpty
+    val shouldShowPendingStages = pendingStages.nonEmpty
+    val shouldShowCompletedStages = completedStages.nonEmpty
+    val shouldShowFailedStages = failedStages.nonEmpty
+
+    val completedStageNumStr = if (numCompletedStages == completedStages.size) {
+      s"$numCompletedStages"
+    } else {
+      s"$numCompletedStages, only showing ${completedStages.size}"
+    }
 
-      val summary: NodeSeq =
-        <div>
-          <ul class="unstyled">
-            {
-              if (shouldShowActiveStages) {
-                <li>
-                  <a href="#active"><strong>Active Stages:</strong></a>
-                  {activeStages.size}
-                </li>
-              }
+    val summary: NodeSeq =
+      <div>
+        <ul class="unstyled">
+          {
+            if (shouldShowActiveStages) {
+              <li>
+                <a href="#active"><strong>Active Stages:</strong></a>
+                {activeStages.size}
+              </li>
             }
-            {
-              if (shouldShowPendingStages) {
-                <li>
-                  <a href="#pending"><strong>Pending Stages:</strong></a>
-                  {pendingStages.size}
-                </li>
-              }
+          }
+          {
+            if (shouldShowPendingStages) {
+              <li>
+                <a href="#pending"><strong>Pending Stages:</strong></a>
+                {pendingStages.size}
+              </li>
             }
-            {
-              if (shouldShowCompletedStages) {
-                <li id="completed-summary">
-                  <a href="#completed"><strong>Completed Stages:</strong></a>
-                  {completedStageNumStr}
-                </li>
-              }
+          }
+          {
+            if (shouldShowCompletedStages) {
+              <li id="completed-summary">
+                <a href="#completed"><strong>Completed Stages:</strong></a>
+                {completedStageNumStr}
+              </li>
             }
-            {
-              if (shouldShowFailedStages) {
-                <li>
-                  <a href="#failed"><strong>Failed Stages:</strong></a>
-                  {numFailedStages}
-                </li>
-              }
+          }
+          {
+            if (shouldShowFailedStages) {
+              <li>
+                <a href="#failed"><strong>Failed Stages:</strong></a>
+                {numFailedStages}
+              </li>
             }
-          </ul>
-        </div>
-
-      var content = summary ++
-        {
-          if (sc.isDefined && isFairScheduler) {
-            <h4>Fair Scheduler Pools ({pools.size})</h4> ++ poolTable.toNodeSeq
-          } else {
-            Seq.empty[Node]
           }
+        </ul>
+      </div>
+
+    var content = summary ++
+      {
+        if (sc.isDefined && isFairScheduler) {
+          <h4>Fair Scheduler Pools ({pools.size})</h4> ++ poolTable.toNodeSeq
+        } else {
+          Seq.empty[Node]
         }
-      if (shouldShowActiveStages) {
-        content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++
-        activeStagesTable.toNodeSeq
-      }
-      if (shouldShowPendingStages) {
-        content ++= <h4 id="pending">Pending Stages ({pendingStages.size})</h4> ++
-        pendingStagesTable.toNodeSeq
-      }
-      if (shouldShowCompletedStages) {
-        content ++= <h4 id="completed">Completed Stages ({completedStageNumStr})</h4> ++
-        completedStagesTable.toNodeSeq
-      }
-      if (shouldShowFailedStages) {
-        content ++= <h4 id ="failed">Failed Stages ({numFailedStages})</h4> ++
-        failedStagesTable.toNodeSeq
       }
-      UIUtils.headerSparkPage("Stages for All Jobs", content, parent)
+    if (shouldShowActiveStages) {
+      content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++
+      activeStagesTable.toNodeSeq
     }
+    if (shouldShowPendingStages) {
+      content ++= <h4 id="pending">Pending Stages ({pendingStages.size})</h4> ++
+      pendingStagesTable.toNodeSeq
+    }
+    if (shouldShowCompletedStages) {
+      content ++= <h4 id="completed">Completed Stages ({completedStageNumStr})</h4> ++
+      completedStagesTable.toNodeSeq
+    }
+    if (shouldShowFailedStages) {
+      content ++= <h4 id ="failed">Failed Stages ({numFailedStages})</h4> ++
+      failedStagesTable.toNodeSeq
+    }
+    UIUtils.headerSparkPage("Stages for All Jobs", content, parent)
   }
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 07a41d1..41d42b5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -17,44 +17,19 @@
 
 package org.apache.spark.ui.jobs
 
-import scala.collection.mutable
 import scala.xml.{Node, Unparsed}
 
 import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status.api.v1.StageData
 import org.apache.spark.ui.{ToolTips, UIUtils}
-import org.apache.spark.ui.jobs.UIData.StageUIData
 import org.apache.spark.util.Utils
 
 /** Stage summary grouped by executors. */
-private[ui] class ExecutorTable(
-    stageId: Int,
-    stageAttemptId: Int,
-    parent: StagesTab,
-    store: AppStatusStore) {
-  private val listener = parent.progressListener
+private[ui] class ExecutorTable(stage: StageData, store: AppStatusStore) {
 
-  def toNodeSeq: Seq[Node] = {
-    listener.synchronized {
-      executorTable()
-    }
-  }
-
-  /** Special table which merges two header cells. */
-  private def executorTable[T](): Seq[Node] = {
-    val stageData = listener.stageIdToData.get((stageId, stageAttemptId))
-    var hasInput = false
-    var hasOutput = false
-    var hasShuffleWrite = false
-    var hasShuffleRead = false
-    var hasBytesSpilled = false
-    stageData.foreach { data =>
-        hasInput = data.hasInput
-        hasOutput = data.hasOutput
-        hasShuffleRead = data.hasShuffleRead
-        hasShuffleWrite = data.hasShuffleWrite
-        hasBytesSpilled = data.hasBytesSpilled
-    }
+  import ApiHelper._
 
+  def toNodeSeq: Seq[Node] = {
     <table class={UIUtils.TABLE_CLASS_STRIPED_SORTABLE}>
       <thead>
         <th id="executorid">Executor ID</th>
@@ -64,29 +39,29 @@ private[ui] class ExecutorTable(
         <th>Failed Tasks</th>
         <th>Killed Tasks</th>
         <th>Succeeded Tasks</th>
-        {if (hasInput) {
+        {if (hasInput(stage)) {
           <th>
             <span data-toggle="tooltip" title={ToolTips.INPUT}>Input Size / Records</span>
           </th>
         }}
-        {if (hasOutput) {
+        {if (hasOutput(stage)) {
           <th>
             <span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output Size / Records</span>
           </th>
         }}
-        {if (hasShuffleRead) {
+        {if (hasShuffleRead(stage)) {
           <th>
             <span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>
             Shuffle Read Size / Records</span>
           </th>
         }}
-        {if (hasShuffleWrite) {
+        {if (hasShuffleWrite(stage)) {
           <th>
             <span data-toggle="tooltip" title={ToolTips.SHUFFLE_WRITE}>
             Shuffle Write Size / Records</span>
           </th>
         }}
-        {if (hasBytesSpilled) {
+        {if (hasBytesSpilled(stage)) {
           <th>Shuffle Spill (Memory)</th>
           <th>Shuffle Spill (Disk)</th>
         }}
@@ -97,7 +72,7 @@ private[ui] class ExecutorTable(
         </th>
       </thead>
       <tbody>
-        {createExecutorTable()}
+        {createExecutorTable(stage)}
       </tbody>
     </table>
     <script>
@@ -111,68 +86,57 @@ private[ui] class ExecutorTable(
     </script>
   }
 
-  private def createExecutorTable() : Seq[Node] = {
-    // Make an executor-id -> address map
-    val executorIdToAddress = mutable.HashMap[String, String]()
-    listener.blockManagerIds.foreach { blockManagerId =>
-      val address = blockManagerId.hostPort
-      val executorId = blockManagerId.executorId
-      executorIdToAddress.put(executorId, address)
-    }
-
-    listener.stageIdToData.get((stageId, stageAttemptId)) match {
-      case Some(stageData: StageUIData) =>
-        stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) =>
-          <tr>
-            <td>
-              <div style="float: left">{k}</div>
-              <div style="float: right">
-              {
-                store.executorSummary(k).map(_.executorLogs).getOrElse(Map.empty).map {
-                  case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div>
-                }
-              }
-              </div>
-            </td>
-            <td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
-            <td sorttable_customkey={v.taskTime.toString}>{UIUtils.formatDuration(v.taskTime)}</td>
-            <td>{v.failedTasks + v.succeededTasks + v.reasonToNumKilled.values.sum}</td>
-            <td>{v.failedTasks}</td>
-            <td>{v.reasonToNumKilled.values.sum}</td>
-            <td>{v.succeededTasks}</td>
-            {if (stageData.hasInput) {
-              <td sorttable_customkey={v.inputBytes.toString}>
-                {s"${Utils.bytesToString(v.inputBytes)} / ${v.inputRecords}"}
-              </td>
-            }}
-            {if (stageData.hasOutput) {
-              <td sorttable_customkey={v.outputBytes.toString}>
-                {s"${Utils.bytesToString(v.outputBytes)} / ${v.outputRecords}"}
-              </td>
-            }}
-            {if (stageData.hasShuffleRead) {
-              <td sorttable_customkey={v.shuffleRead.toString}>
-                {s"${Utils.bytesToString(v.shuffleRead)} / ${v.shuffleReadRecords}"}
-              </td>
-            }}
-            {if (stageData.hasShuffleWrite) {
-              <td sorttable_customkey={v.shuffleWrite.toString}>
-                {s"${Utils.bytesToString(v.shuffleWrite)} / ${v.shuffleWriteRecords}"}
-              </td>
-            }}
-            {if (stageData.hasBytesSpilled) {
-              <td sorttable_customkey={v.memoryBytesSpilled.toString}>
-                {Utils.bytesToString(v.memoryBytesSpilled)}
-              </td>
-              <td sorttable_customkey={v.diskBytesSpilled.toString}>
-                {Utils.bytesToString(v.diskBytesSpilled)}
-              </td>
-            }}
-            <td>{v.isBlacklisted}</td>
-          </tr>
-        }
-      case None =>
-        Seq.empty[Node]
+  private def createExecutorTable(stage: StageData) : Seq[Node] = {
+    stage.executorSummary.getOrElse(Map.empty).toSeq.sortBy(_._1).map { case (k, v) =>
+      val executor = store.asOption(store.executorSummary(k))
+      <tr>
+        <td>
+          <div style="float: left">{k}</div>
+          <div style="float: right">
+          {
+            executor.map(_.executorLogs).getOrElse(Map.empty).map {
+              case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div>
+            }
+          }
+          </div>
+        </td>
+        <td>{executor.map { e => e.hostPort }.getOrElse("CANNOT FIND ADDRESS")}</td>
+        <td sorttable_customkey={v.taskTime.toString}>{UIUtils.formatDuration(v.taskTime)}</td>
+        <td>{v.failedTasks + v.succeededTasks + v.killedTasks}</td>
+        <td>{v.failedTasks}</td>
+        <td>{v.killedTasks}</td>
+        <td>{v.succeededTasks}</td>
+        {if (hasInput(stage)) {
+          <td sorttable_customkey={v.inputBytes.toString}>
+            {s"${Utils.bytesToString(v.inputBytes)} / ${v.inputRecords}"}
+          </td>
+        }}
+        {if (hasOutput(stage)) {
+          <td sorttable_customkey={v.outputBytes.toString}>
+            {s"${Utils.bytesToString(v.outputBytes)} / ${v.outputRecords}"}
+          </td>
+        }}
+        {if (hasShuffleRead(stage)) {
+          <td sorttable_customkey={v.shuffleRead.toString}>
+            {s"${Utils.bytesToString(v.shuffleRead)} / ${v.shuffleReadRecords}"}
+          </td>
+        }}
+        {if (hasShuffleWrite(stage)) {
+          <td sorttable_customkey={v.shuffleWrite.toString}>
+            {s"${Utils.bytesToString(v.shuffleWrite)} / ${v.shuffleWriteRecords}"}
+          </td>
+        }}
+        {if (hasBytesSpilled(stage)) {
+          <td sorttable_customkey={v.memoryBytesSpilled.toString}>
+            {Utils.bytesToString(v.memoryBytesSpilled)}
+          </td>
+          <td sorttable_customkey={v.diskBytesSpilled.toString}>
+            {Utils.bytesToString(v.diskBytesSpilled)}
+          </td>
+        }}
+        <td>{executor.map(_.isBlacklisted).getOrElse(false)}</td>
+      </tr>
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index 7ed0164..740f12e7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.ui.jobs
 
-import java.util.{Date, Locale}
+import java.util.Locale
 import javax.servlet.http.HttpServletRequest
 
 import scala.collection.mutable.{Buffer, ListBuffer}
@@ -27,11 +27,12 @@ import org.apache.commons.lang3.StringEscapeUtils
 
 import org.apache.spark.JobExecutionStatus
 import org.apache.spark.scheduler._
-import org.apache.spark.status.api.v1.ExecutorSummary
-import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
+import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status.api.v1
+import org.apache.spark.ui._
 
 /** Page showing statistics and stage list for a given job */
-private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
+private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIPage("job") {
 
   private val STAGES_LEGEND =
     <div class="legend-area"><svg width="150px" height="85px">
@@ -56,14 +57,15 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
       <text x="35px" y="42px">Removed</text>
     </svg></div>.toString.filter(_ != '\n')
 
-  private def makeStageEvent(stageInfos: Seq[StageInfo]): Seq[String] = {
+  private def makeStageEvent(stageInfos: Seq[v1.StageData]): Seq[String] = {
     stageInfos.map { stage =>
       val stageId = stage.stageId
       val attemptId = stage.attemptId
       val name = stage.name
-      val status = stage.getStatusString
-      val submissionTime = stage.submissionTime.get
-      val completionTime = stage.completionTime.getOrElse(System.currentTimeMillis())
+      val status = stage.status.toString
+      val submissionTime = stage.submissionTime.get.getTime()
+      val completionTime = stage.completionTime.map(_.getTime())
+        .getOrElse(System.currentTimeMillis())
 
       // The timeline library treats contents as HTML, so we have to escape them. We need to add
       // extra layers of escaping in order to embed this in a Javascript string literal.
@@ -79,10 +81,10 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
          |   'data-placement="top" data-html="true"' +
          |   'data-title="${jsEscapedName} (Stage ${stageId}.${attemptId})<br>' +
          |   'Status: ${status.toUpperCase(Locale.ROOT)}<br>' +
-         |   'Submitted: ${UIUtils.formatDate(new Date(submissionTime))}' +
+         |   'Submitted: ${UIUtils.formatDate(submissionTime)}' +
          |   '${
                  if (status != "running") {
-                   s"""<br>Completed: ${UIUtils.formatDate(new Date(completionTime))}"""
+                   s"""<br>Completed: ${UIUtils.formatDate(completionTime)}"""
                  } else {
                    ""
                  }
@@ -93,7 +95,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
     }
   }
 
-  def makeExecutorEvent(executors: Seq[ExecutorSummary]): Seq[String] = {
+  def makeExecutorEvent(executors: Seq[v1.ExecutorSummary]): Seq[String] = {
     val events = ListBuffer[String]()
     executors.foreach { e =>
       val addedEvent =
@@ -137,8 +139,8 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
   }
 
   private def makeTimeline(
-      stages: Seq[StageInfo],
-      executors: Seq[ExecutorSummary],
+      stages: Seq[v1.StageData],
+      executors: Seq[v1.ExecutorSummary],
       appStartTime: Long): Seq[Node] = {
 
     val stageEventJsonAsStrSeq = makeStageEvent(stages)
@@ -182,173 +184,181 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
   }
 
   def render(request: HttpServletRequest): Seq[Node] = {
-    val listener = parent.jobProgresslistener
+    // stripXSS is called first to remove suspicious characters used in XSS attacks
+    val parameterId = UIUtils.stripXSS(request.getParameter("id"))
+    require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
 
-    listener.synchronized {
-      // stripXSS is called first to remove suspicious characters used in XSS attacks
-      val parameterId = UIUtils.stripXSS(request.getParameter("id"))
-      require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
-
-      val jobId = parameterId.toInt
-      val jobDataOption = listener.jobIdToData.get(jobId)
-      if (jobDataOption.isEmpty) {
-        val content =
-          <div id="no-info">
-            <p>No information to display for job {jobId}</p>
-          </div>
-        return UIUtils.headerSparkPage(
-          s"Details for Job $jobId", content, parent)
-      }
-      val jobData = jobDataOption.get
-      val isComplete = jobData.status != JobExecutionStatus.RUNNING
-      val stages = jobData.stageIds.map { stageId =>
-        // This could be empty if the JobProgressListener hasn't received information about the
-        // stage or if the stage information has been garbage collected
-        listener.stageIdToInfo.getOrElse(stageId,
-          new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown"))
+    val jobId = parameterId.toInt
+    val jobData = store.asOption(store.job(jobId)).getOrElse {
+      val content =
+        <div id="no-info">
+          <p>No information to display for job {jobId}</p>
+        </div>
+      return UIUtils.headerSparkPage(
+        s"Details for Job $jobId", content, parent)
+    }
+    val isComplete = jobData.status != JobExecutionStatus.RUNNING
+    val stages = jobData.stageIds.map { stageId =>
+      // This could be empty if the listener hasn't received information about the
+      // stage or if the stage information has been garbage collected
+      store.stageData(stageId).lastOption.getOrElse {
+        new v1.StageData(
+          v1.StageStatus.PENDING,
+          stageId,
+          0, 0, 0, 0, 0, 0, 0,
+          0L, 0L, None, None, None, None,
+          0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L,
+          "Unknown",
+          None,
+          "Unknown",
+          null,
+          Nil,
+          Nil,
+          None,
+          None,
+          Map())
       }
+    }
 
-      val activeStages = Buffer[StageInfo]()
-      val completedStages = Buffer[StageInfo]()
-      // If the job is completed, then any pending stages are displayed as "skipped":
-      val pendingOrSkippedStages = Buffer[StageInfo]()
-      val failedStages = Buffer[StageInfo]()
-      for (stage <- stages) {
-        if (stage.submissionTime.isEmpty) {
-          pendingOrSkippedStages += stage
-        } else if (stage.completionTime.isDefined) {
-          if (stage.failureReason.isDefined) {
-            failedStages += stage
-          } else {
-            completedStages += stage
-          }
+    val activeStages = Buffer[v1.StageData]()
+    val completedStages = Buffer[v1.StageData]()
+    // If the job is completed, then any pending stages are displayed as "skipped":
+    val pendingOrSkippedStages = Buffer[v1.StageData]()
+    val failedStages = Buffer[v1.StageData]()
+    for (stage <- stages) {
+      if (stage.submissionTime.isEmpty) {
+        pendingOrSkippedStages += stage
+      } else if (stage.completionTime.isDefined) {
+        if (stage.status == v1.StageStatus.FAILED) {
+          failedStages += stage
         } else {
-          activeStages += stage
+          completedStages += stage
         }
+      } else {
+        activeStages += stage
       }
+    }
 
-      val basePath = "jobs/job"
+    val basePath = "jobs/job"
 
-      val pendingOrSkippedTableId =
-        if (isComplete) {
-          "pending"
-        } else {
-          "skipped"
-        }
+    val pendingOrSkippedTableId =
+      if (isComplete) {
+        "pending"
+      } else {
+        "skipped"
+      }
 
-      val activeStagesTable =
-        new StageTableBase(request, activeStages, "active", "activeStage", parent.basePath,
-          basePath, parent.jobProgresslistener, parent.isFairScheduler,
-          killEnabled = parent.killEnabled, isFailedStage = false)
-      val pendingOrSkippedStagesTable =
-        new StageTableBase(request, pendingOrSkippedStages, pendingOrSkippedTableId, "pendingStage",
-          parent.basePath, basePath, parent.jobProgresslistener, parent.isFairScheduler,
-          killEnabled = false, isFailedStage = false)
-      val completedStagesTable =
-        new StageTableBase(request, completedStages, "completed", "completedStage", parent.basePath,
-          basePath, parent.jobProgresslistener, parent.isFairScheduler,
-          killEnabled = false, isFailedStage = false)
-      val failedStagesTable =
-        new StageTableBase(request, failedStages, "failed", "failedStage", parent.basePath,
-          basePath, parent.jobProgresslistener, parent.isFairScheduler,
-          killEnabled = false, isFailedStage = true)
+    val activeStagesTable =
+      new StageTableBase(store, request, activeStages, "active", "activeStage", parent.basePath,
+        basePath, parent.isFairScheduler,
+        killEnabled = parent.killEnabled, isFailedStage = false)
+    val pendingOrSkippedStagesTable =
+      new StageTableBase(store, request, pendingOrSkippedStages, pendingOrSkippedTableId,
+        "pendingStage", parent.basePath, basePath, parent.isFairScheduler,
+        killEnabled = false, isFailedStage = false)
+    val completedStagesTable =
+      new StageTableBase(store, request, completedStages, "completed", "completedStage",
+        parent.basePath, basePath, parent.isFairScheduler,
+        killEnabled = false, isFailedStage = false)
+    val failedStagesTable =
+      new StageTableBase(store, request, failedStages, "failed", "failedStage", parent.basePath,
+        basePath, parent.isFairScheduler,
+        killEnabled = false, isFailedStage = true)
 
-      val shouldShowActiveStages = activeStages.nonEmpty
-      val shouldShowPendingStages = !isComplete && pendingOrSkippedStages.nonEmpty
-      val shouldShowCompletedStages = completedStages.nonEmpty
-      val shouldShowSkippedStages = isComplete && pendingOrSkippedStages.nonEmpty
-      val shouldShowFailedStages = failedStages.nonEmpty
+    val shouldShowActiveStages = activeStages.nonEmpty
+    val shouldShowPendingStages = !isComplete && pendingOrSkippedStages.nonEmpty
+    val shouldShowCompletedStages = completedStages.nonEmpty
+    val shouldShowSkippedStages = isComplete && pendingOrSkippedStages.nonEmpty
+    val shouldShowFailedStages = failedStages.nonEmpty
 
-      val summary: NodeSeq =
-        <div>
-          <ul class="unstyled">
-            <li>
-              <Strong>Status:</Strong>
-              {jobData.status}
-            </li>
-            {
-              if (jobData.jobGroup.isDefined) {
-                <li>
-                  <strong>Job Group:</strong>
-                  {jobData.jobGroup.get}
-                </li>
-              }
-            }
-            {
-              if (shouldShowActiveStages) {
-                <li>
-                  <a href="#active"><strong>Active Stages:</strong></a>
-                  {activeStages.size}
-                </li>
-              }
-            }
-            {
-              if (shouldShowPendingStages) {
-                <li>
-                  <a href="#pending">
-                    <strong>Pending Stages:</strong>
-                  </a>{pendingOrSkippedStages.size}
-                </li>
-              }
+    val summary: NodeSeq =
+      <div>
+        <ul class="unstyled">
+          <li>
+            <Strong>Status:</Strong>
+            {jobData.status}
+          </li>
+          {
+            if (jobData.jobGroup.isDefined) {
+              <li>
+                <strong>Job Group:</strong>
+                {jobData.jobGroup.get}
+              </li>
             }
-            {
-              if (shouldShowCompletedStages) {
-                <li>
-                  <a href="#completed"><strong>Completed Stages:</strong></a>
-                  {completedStages.size}
-                </li>
-              }
+          }
+          {
+            if (shouldShowActiveStages) {
+              <li>
+                <a href="#active"><strong>Active Stages:</strong></a>
+                {activeStages.size}
+              </li>
             }
-            {
-              if (shouldShowSkippedStages) {
+          }
+          {
+            if (shouldShowPendingStages) {
               <li>
-                <a href="#skipped"><strong>Skipped Stages:</strong></a>
-                {pendingOrSkippedStages.size}
+                <a href="#pending">
+                  <strong>Pending Stages:</strong>
+                </a>{pendingOrSkippedStages.size}
               </li>
             }
+          }
+          {
+            if (shouldShowCompletedStages) {
+              <li>
+                <a href="#completed"><strong>Completed Stages:</strong></a>
+                {completedStages.size}
+              </li>
             }
-            {
-              if (shouldShowFailedStages) {
-                <li>
-                  <a href="#failed"><strong>Failed Stages:</strong></a>
-                  {failedStages.size}
-                </li>
-              }
+          }
+          {
+            if (shouldShowSkippedStages) {
+            <li>
+              <a href="#skipped"><strong>Skipped Stages:</strong></a>
+              {pendingOrSkippedStages.size}
+            </li>
+          }
+          }
+          {
+            if (shouldShowFailedStages) {
+              <li>
+                <a href="#failed"><strong>Failed Stages:</strong></a>
+                {failedStages.size}
+              </li>
             }
-          </ul>
-        </div>
+          }
+        </ul>
+      </div>
 
-      var content = summary
-      val appStartTime = listener.startTime
-      val operationGraphListener = parent.operationGraphListener
+    var content = summary
+    val appStartTime = store.applicationInfo().attempts.head.startTime.getTime()
 
-      content ++= makeTimeline(activeStages ++ completedStages ++ failedStages,
-          parent.parent.store.executorList(false), appStartTime)
+    content ++= makeTimeline(activeStages ++ completedStages ++ failedStages,
+      store.executorList(false), appStartTime)
 
-      content ++= UIUtils.showDagVizForJob(
-        jobId, operationGraphListener.getOperationGraphForJob(jobId))
+    content ++= UIUtils.showDagVizForJob(
+      jobId, store.operationGraphForJob(jobId))
 
-      if (shouldShowActiveStages) {
-        content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++
-          activeStagesTable.toNodeSeq
-      }
-      if (shouldShowPendingStages) {
-        content ++= <h4 id="pending">Pending Stages ({pendingOrSkippedStages.size})</h4> ++
-          pendingOrSkippedStagesTable.toNodeSeq
-      }
-      if (shouldShowCompletedStages) {
-        content ++= <h4 id="completed">Completed Stages ({completedStages.size})</h4> ++
-          completedStagesTable.toNodeSeq
-      }
-      if (shouldShowSkippedStages) {
-        content ++= <h4 id="skipped">Skipped Stages ({pendingOrSkippedStages.size})</h4> ++
-          pendingOrSkippedStagesTable.toNodeSeq
-      }
-      if (shouldShowFailedStages) {
-        content ++= <h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++
-          failedStagesTable.toNodeSeq
-      }
-      UIUtils.headerSparkPage(s"Details for Job $jobId", content, parent, showVisualization = true)
+    if (shouldShowActiveStages) {
+      content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++
+        activeStagesTable.toNodeSeq
+    }
+    if (shouldShowPendingStages) {
+      content ++= <h4 id="pending">Pending Stages ({pendingOrSkippedStages.size})</h4> ++
+        pendingOrSkippedStagesTable.toNodeSeq
+    }
+    if (shouldShowCompletedStages) {
+      content ++= <h4 id="completed">Completed Stages ({completedStages.size})</h4> ++
+        completedStagesTable.toNodeSeq
+    }
+    if (shouldShowSkippedStages) {
+      content ++= <h4 id="skipped">Skipped Stages ({pendingOrSkippedStages.size})</h4> ++
+        pendingOrSkippedStagesTable.toNodeSeq
+    }
+    if (shouldShowFailedStages) {
+      content ++= <h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++
+        failedStagesTable.toNodeSeq
     }
+    UIUtils.headerSparkPage(s"Details for Job $jobId", content, parent, showVisualization = true)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
index 81ffe04..99eab1b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
@@ -19,35 +19,45 @@ package org.apache.spark.ui.jobs
 
 import javax.servlet.http.HttpServletRequest
 
+import scala.collection.JavaConverters._
+
+import org.apache.spark.JobExecutionStatus
 import org.apache.spark.scheduler.SchedulingMode
-import org.apache.spark.ui.{SparkUI, SparkUITab, UIUtils}
+import org.apache.spark.status.AppStatusStore
+import org.apache.spark.ui._
 
 /** Web UI showing progress status of all jobs in the given SparkContext. */
-private[ui] class JobsTab(val parent: SparkUI) extends SparkUITab(parent, "jobs") {
+private[ui] class JobsTab(parent: SparkUI, store: AppStatusStore)
+  extends SparkUITab(parent, "jobs") {
+
   val sc = parent.sc
   val killEnabled = parent.killEnabled
-  val jobProgresslistener = parent.jobProgressListener
-  val operationGraphListener = parent.operationGraphListener
 
-  def isFairScheduler: Boolean =
-    jobProgresslistener.schedulingMode == Some(SchedulingMode.FAIR)
+  def isFairScheduler: Boolean = {
+    store.environmentInfo().sparkProperties.toMap
+      .get("spark.scheduler.mode")
+      .map { mode => mode == SchedulingMode.FAIR }
+      .getOrElse(false)
+  }
 
   def getSparkUser: String = parent.getSparkUser
 
-  attachPage(new AllJobsPage(this))
-  attachPage(new JobPage(this))
+  attachPage(new AllJobsPage(this, store))
+  attachPage(new JobPage(this, store))
 
   def handleKillRequest(request: HttpServletRequest): Unit = {
     if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
       // stripXSS is called first to remove suspicious characters used in XSS attacks
       val jobId = Option(UIUtils.stripXSS(request.getParameter("id"))).map(_.toInt)
       jobId.foreach { id =>
-        if (jobProgresslistener.activeJobs.contains(id)) {
-          sc.foreach(_.cancelJob(id))
-          // Do a quick pause here to give Spark time to kill the job so it shows up as
-          // killed after the refresh. Note that this will block the serving thread so the
-          // time should be limited in duration.
-          Thread.sleep(100)
+        store.asOption(store.job(id)).foreach { job =>
+          if (job.status == JobExecutionStatus.RUNNING) {
+            sc.foreach(_.cancelJob(id))
+            // Do a quick pause here to give Spark time to kill the job so it shows up as
+            // killed after the refresh. Note that this will block the serving thread so the
+            // time should be limited in duration.
+            Thread.sleep(100)
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index 4b8c7b2..98fbd7a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -21,46 +21,39 @@ import javax.servlet.http.HttpServletRequest
 
 import scala.xml.Node
 
-import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.status.PoolData
+import org.apache.spark.status.api.v1._
 import org.apache.spark.ui.{UIUtils, WebUIPage}
 
 /** Page showing specific pool details */
 private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {
-  private val sc = parent.sc
-  private val listener = parent.progressListener
 
   def render(request: HttpServletRequest): Seq[Node] = {
-    listener.synchronized {
-      // stripXSS is called first to remove suspicious characters used in XSS attacks
-      val poolName = Option(UIUtils.stripXSS(request.getParameter("poolname"))).map { poolname =>
-        UIUtils.decodeURLParameter(poolname)
-      }.getOrElse {
-        throw new IllegalArgumentException(s"Missing poolname parameter")
-      }
-
-      val poolToActiveStages = listener.poolToActiveStages
-      val activeStages = poolToActiveStages.get(poolName) match {
-        case Some(s) => s.values.toSeq
-        case None => Seq.empty[StageInfo]
-      }
-      val shouldShowActiveStages = activeStages.nonEmpty
-      val activeStagesTable =
-        new StageTableBase(request, activeStages, "", "activeStage", parent.basePath, "stages/pool",
-          parent.progressListener, parent.isFairScheduler, parent.killEnabled,
-          isFailedStage = false)
-
-      // For now, pool information is only accessible in live UIs
-      val pools = sc.map(_.getPoolForName(poolName).getOrElse {
-        throw new IllegalArgumentException(s"Unknown poolname: $poolName")
-      }).toSeq
-      val poolTable = new PoolTable(pools, parent)
+    // stripXSS is called first to remove suspicious characters used in XSS attacks
+    val poolName = Option(UIUtils.stripXSS(request.getParameter("poolname"))).map { poolname =>
+      UIUtils.decodeURLParameter(poolname)
+    }.getOrElse {
+      throw new IllegalArgumentException(s"Missing poolname parameter")
+    }
 
-      var content = <h4>Summary </h4> ++ poolTable.toNodeSeq
-      if (shouldShowActiveStages) {
-        content ++= <h4>Active Stages ({activeStages.size})</h4> ++ activeStagesTable.toNodeSeq
-      }
+    // For now, pool information is only accessible in live UIs
+    val pool = parent.sc.flatMap(_.getPoolForName(poolName)).getOrElse {
+      throw new IllegalArgumentException(s"Unknown pool: $poolName")
+    }
 
-      UIUtils.headerSparkPage("Fair Scheduler Pool: " + poolName, content, parent)
+    val uiPool = parent.store.asOption(parent.store.pool(poolName)).getOrElse(
+      new PoolData(poolName, Set()))
+    val activeStages = uiPool.stageIds.toSeq.map(parent.store.lastStageAttempt(_))
+    val activeStagesTable =
+      new StageTableBase(parent.store, request, activeStages, "", "activeStage", parent.basePath,
+        "stages/pool", parent.isFairScheduler, parent.killEnabled, false)
+
+    val poolTable = new PoolTable(Map(pool -> uiPool), parent)
+    var content = <h4>Summary </h4> ++ poolTable.toNodeSeq
+    if (activeStages.nonEmpty) {
+      content ++= <h4>Active Stages ({activeStages.size})</h4> ++ activeStagesTable.toNodeSeq
     }
+
+    UIUtils.headerSparkPage("Fair Scheduler Pool: " + poolName, content, parent)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index ea02968..5dfce85 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -19,25 +19,16 @@ package org.apache.spark.ui.jobs
 
 import java.net.URLEncoder
 
-import scala.collection.mutable.HashMap
 import scala.xml.Node
 
-import org.apache.spark.scheduler.{Schedulable, StageInfo}
+import org.apache.spark.scheduler.Schedulable
+import org.apache.spark.status.PoolData
 import org.apache.spark.ui.UIUtils
 
 /** Table showing list of pools */
-private[ui] class PoolTable(pools: Seq[Schedulable], parent: StagesTab) {
-  private val listener = parent.progressListener
+private[ui] class PoolTable(pools: Map[Schedulable, PoolData], parent: StagesTab) {
 
   def toNodeSeq: Seq[Node] = {
-    listener.synchronized {
-      poolTable(poolRow, pools)
-    }
-  }
-
-  private def poolTable(
-      makeRow: (Schedulable, HashMap[String, HashMap[Int, StageInfo]]) => Seq[Node],
-      rows: Seq[Schedulable]): Seq[Node] = {
     <table class="table table-bordered table-striped table-condensed sortable table-fixed">
       <thead>
         <th>Pool Name</th>
@@ -48,29 +39,24 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: StagesTab) {
         <th>SchedulingMode</th>
       </thead>
       <tbody>
-        {rows.map(r => makeRow(r, listener.poolToActiveStages))}
+        {pools.map { case (s, p) => poolRow(s, p) }}
       </tbody>
     </table>
   }
 
-  private def poolRow(
-      p: Schedulable,
-      poolToActiveStages: HashMap[String, HashMap[Int, StageInfo]]): Seq[Node] = {
-    val activeStages = poolToActiveStages.get(p.name) match {
-      case Some(stages) => stages.size
-      case None => 0
-    }
+  private def poolRow(s: Schedulable, p: PoolData): Seq[Node] = {
+    val activeStages = p.stageIds.size
     val href = "%s/stages/pool?poolname=%s"
       .format(UIUtils.prependBaseUri(parent.basePath), URLEncoder.encode(p.name, "UTF-8"))
     <tr>
       <td>
         <a href={href}>{p.name}</a>
       </td>
-      <td>{p.minShare}</td>
-      <td>{p.weight}</td>
+      <td>{s.minShare}</td>
+      <td>{s.weight}</td>
       <td>{activeStages}</td>
-      <td>{p.runningTasks}</td>
-      <td>{p.schedulingMode}</td>
+      <td>{s.runningTasks}</td>
+      <td>{s.schedulingMode}</td>
     </tr>
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org