You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2017/11/14 16:34:41 UTC
[4/5] spark git commit: [SPARK-20648][CORE] Port JobsTab and StageTab
to the new UI backend.
http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
index dc5b03c..e4cf99e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
@@ -22,120 +22,121 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.{Node, NodeSeq}
import org.apache.spark.scheduler.Schedulable
+import org.apache.spark.status.PoolData
+import org.apache.spark.status.api.v1._
import org.apache.spark.ui.{UIUtils, WebUIPage}
/** Page showing list of all ongoing and recently finished stages and pools */
private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
private val sc = parent.sc
- private val listener = parent.progressListener
private def isFairScheduler = parent.isFairScheduler
def render(request: HttpServletRequest): Seq[Node] = {
- listener.synchronized {
- val activeStages = listener.activeStages.values.toSeq
- val pendingStages = listener.pendingStages.values.toSeq
- val completedStages = listener.completedStages.reverse
- val numCompletedStages = listener.numCompletedStages
- val failedStages = listener.failedStages.reverse
- val numFailedStages = listener.numFailedStages
- val subPath = "stages"
+ val allStages = parent.store.stageList(null)
- val activeStagesTable =
- new StageTableBase(request, activeStages, "active", "activeStage", parent.basePath, subPath,
- parent.progressListener, parent.isFairScheduler,
- killEnabled = parent.killEnabled, isFailedStage = false)
- val pendingStagesTable =
- new StageTableBase(request, pendingStages, "pending", "pendingStage", parent.basePath,
- subPath, parent.progressListener, parent.isFairScheduler,
- killEnabled = false, isFailedStage = false)
- val completedStagesTable =
- new StageTableBase(request, completedStages, "completed", "completedStage", parent.basePath,
- subPath, parent.progressListener, parent.isFairScheduler,
- killEnabled = false, isFailedStage = false)
- val failedStagesTable =
- new StageTableBase(request, failedStages, "failed", "failedStage", parent.basePath, subPath,
- parent.progressListener, parent.isFairScheduler,
- killEnabled = false, isFailedStage = true)
+ val activeStages = allStages.filter(_.status == StageStatus.ACTIVE)
+ val pendingStages = allStages.filter(_.status == StageStatus.PENDING)
+ val completedStages = allStages.filter(_.status == StageStatus.COMPLETE)
+ val failedStages = allStages.filter(_.status == StageStatus.FAILED).reverse
- // For now, pool information is only accessible in live UIs
- val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable])
- val poolTable = new PoolTable(pools, parent)
+ val numCompletedStages = completedStages.size
+ val numFailedStages = failedStages.size
+ val subPath = "stages"
- val shouldShowActiveStages = activeStages.nonEmpty
- val shouldShowPendingStages = pendingStages.nonEmpty
- val shouldShowCompletedStages = completedStages.nonEmpty
- val shouldShowFailedStages = failedStages.nonEmpty
+ val activeStagesTable =
+ new StageTableBase(parent.store, request, activeStages, "active", "activeStage",
+ parent.basePath, subPath, parent.isFairScheduler, parent.killEnabled, false)
+ val pendingStagesTable =
+ new StageTableBase(parent.store, request, pendingStages, "pending", "pendingStage",
+ parent.basePath, subPath, parent.isFairScheduler, false, false)
+ val completedStagesTable =
+ new StageTableBase(parent.store, request, completedStages, "completed", "completedStage",
+ parent.basePath, subPath, parent.isFairScheduler, false, false)
+ val failedStagesTable =
+ new StageTableBase(parent.store, request, failedStages, "failed", "failedStage",
+ parent.basePath, subPath, parent.isFairScheduler, false, true)
- val completedStageNumStr = if (numCompletedStages == completedStages.size) {
- s"$numCompletedStages"
- } else {
- s"$numCompletedStages, only showing ${completedStages.size}"
- }
+ // For now, pool information is only accessible in live UIs
+ val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable]).map { pool =>
+ val uiPool = parent.store.asOption(parent.store.pool(pool.name)).getOrElse(
+ new PoolData(pool.name, Set()))
+ pool -> uiPool
+ }.toMap
+ val poolTable = new PoolTable(pools, parent)
+
+ val shouldShowActiveStages = activeStages.nonEmpty
+ val shouldShowPendingStages = pendingStages.nonEmpty
+ val shouldShowCompletedStages = completedStages.nonEmpty
+ val shouldShowFailedStages = failedStages.nonEmpty
+
+ val completedStageNumStr = if (numCompletedStages == completedStages.size) {
+ s"$numCompletedStages"
+ } else {
+ s"$numCompletedStages, only showing ${completedStages.size}"
+ }
- val summary: NodeSeq =
- <div>
- <ul class="unstyled">
- {
- if (shouldShowActiveStages) {
- <li>
- <a href="#active"><strong>Active Stages:</strong></a>
- {activeStages.size}
- </li>
- }
+ val summary: NodeSeq =
+ <div>
+ <ul class="unstyled">
+ {
+ if (shouldShowActiveStages) {
+ <li>
+ <a href="#active"><strong>Active Stages:</strong></a>
+ {activeStages.size}
+ </li>
}
- {
- if (shouldShowPendingStages) {
- <li>
- <a href="#pending"><strong>Pending Stages:</strong></a>
- {pendingStages.size}
- </li>
- }
+ }
+ {
+ if (shouldShowPendingStages) {
+ <li>
+ <a href="#pending"><strong>Pending Stages:</strong></a>
+ {pendingStages.size}
+ </li>
}
- {
- if (shouldShowCompletedStages) {
- <li id="completed-summary">
- <a href="#completed"><strong>Completed Stages:</strong></a>
- {completedStageNumStr}
- </li>
- }
+ }
+ {
+ if (shouldShowCompletedStages) {
+ <li id="completed-summary">
+ <a href="#completed"><strong>Completed Stages:</strong></a>
+ {completedStageNumStr}
+ </li>
}
- {
- if (shouldShowFailedStages) {
- <li>
- <a href="#failed"><strong>Failed Stages:</strong></a>
- {numFailedStages}
- </li>
- }
+ }
+ {
+ if (shouldShowFailedStages) {
+ <li>
+ <a href="#failed"><strong>Failed Stages:</strong></a>
+ {numFailedStages}
+ </li>
}
- </ul>
- </div>
-
- var content = summary ++
- {
- if (sc.isDefined && isFairScheduler) {
- <h4>Fair Scheduler Pools ({pools.size})</h4> ++ poolTable.toNodeSeq
- } else {
- Seq.empty[Node]
}
+ </ul>
+ </div>
+
+ var content = summary ++
+ {
+ if (sc.isDefined && isFairScheduler) {
+ <h4>Fair Scheduler Pools ({pools.size})</h4> ++ poolTable.toNodeSeq
+ } else {
+ Seq.empty[Node]
}
- if (shouldShowActiveStages) {
- content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++
- activeStagesTable.toNodeSeq
- }
- if (shouldShowPendingStages) {
- content ++= <h4 id="pending">Pending Stages ({pendingStages.size})</h4> ++
- pendingStagesTable.toNodeSeq
- }
- if (shouldShowCompletedStages) {
- content ++= <h4 id="completed">Completed Stages ({completedStageNumStr})</h4> ++
- completedStagesTable.toNodeSeq
- }
- if (shouldShowFailedStages) {
- content ++= <h4 id ="failed">Failed Stages ({numFailedStages})</h4> ++
- failedStagesTable.toNodeSeq
}
- UIUtils.headerSparkPage("Stages for All Jobs", content, parent)
+ if (shouldShowActiveStages) {
+ content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++
+ activeStagesTable.toNodeSeq
}
+ if (shouldShowPendingStages) {
+ content ++= <h4 id="pending">Pending Stages ({pendingStages.size})</h4> ++
+ pendingStagesTable.toNodeSeq
+ }
+ if (shouldShowCompletedStages) {
+ content ++= <h4 id="completed">Completed Stages ({completedStageNumStr})</h4> ++
+ completedStagesTable.toNodeSeq
+ }
+ if (shouldShowFailedStages) {
+ content ++= <h4 id ="failed">Failed Stages ({numFailedStages})</h4> ++
+ failedStagesTable.toNodeSeq
+ }
+ UIUtils.headerSparkPage("Stages for All Jobs", content, parent)
}
}
-
http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 07a41d1..41d42b5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -17,44 +17,19 @@
package org.apache.spark.ui.jobs
-import scala.collection.mutable
import scala.xml.{Node, Unparsed}
import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status.api.v1.StageData
import org.apache.spark.ui.{ToolTips, UIUtils}
-import org.apache.spark.ui.jobs.UIData.StageUIData
import org.apache.spark.util.Utils
/** Stage summary grouped by executors. */
-private[ui] class ExecutorTable(
- stageId: Int,
- stageAttemptId: Int,
- parent: StagesTab,
- store: AppStatusStore) {
- private val listener = parent.progressListener
+private[ui] class ExecutorTable(stage: StageData, store: AppStatusStore) {
- def toNodeSeq: Seq[Node] = {
- listener.synchronized {
- executorTable()
- }
- }
-
- /** Special table which merges two header cells. */
- private def executorTable[T](): Seq[Node] = {
- val stageData = listener.stageIdToData.get((stageId, stageAttemptId))
- var hasInput = false
- var hasOutput = false
- var hasShuffleWrite = false
- var hasShuffleRead = false
- var hasBytesSpilled = false
- stageData.foreach { data =>
- hasInput = data.hasInput
- hasOutput = data.hasOutput
- hasShuffleRead = data.hasShuffleRead
- hasShuffleWrite = data.hasShuffleWrite
- hasBytesSpilled = data.hasBytesSpilled
- }
+ import ApiHelper._
+ def toNodeSeq: Seq[Node] = {
<table class={UIUtils.TABLE_CLASS_STRIPED_SORTABLE}>
<thead>
<th id="executorid">Executor ID</th>
@@ -64,29 +39,29 @@ private[ui] class ExecutorTable(
<th>Failed Tasks</th>
<th>Killed Tasks</th>
<th>Succeeded Tasks</th>
- {if (hasInput) {
+ {if (hasInput(stage)) {
<th>
<span data-toggle="tooltip" title={ToolTips.INPUT}>Input Size / Records</span>
</th>
}}
- {if (hasOutput) {
+ {if (hasOutput(stage)) {
<th>
<span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output Size / Records</span>
</th>
}}
- {if (hasShuffleRead) {
+ {if (hasShuffleRead(stage)) {
<th>
<span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>
Shuffle Read Size / Records</span>
</th>
}}
- {if (hasShuffleWrite) {
+ {if (hasShuffleWrite(stage)) {
<th>
<span data-toggle="tooltip" title={ToolTips.SHUFFLE_WRITE}>
Shuffle Write Size / Records</span>
</th>
}}
- {if (hasBytesSpilled) {
+ {if (hasBytesSpilled(stage)) {
<th>Shuffle Spill (Memory)</th>
<th>Shuffle Spill (Disk)</th>
}}
@@ -97,7 +72,7 @@ private[ui] class ExecutorTable(
</th>
</thead>
<tbody>
- {createExecutorTable()}
+ {createExecutorTable(stage)}
</tbody>
</table>
<script>
@@ -111,68 +86,57 @@ private[ui] class ExecutorTable(
</script>
}
- private def createExecutorTable() : Seq[Node] = {
- // Make an executor-id -> address map
- val executorIdToAddress = mutable.HashMap[String, String]()
- listener.blockManagerIds.foreach { blockManagerId =>
- val address = blockManagerId.hostPort
- val executorId = blockManagerId.executorId
- executorIdToAddress.put(executorId, address)
- }
-
- listener.stageIdToData.get((stageId, stageAttemptId)) match {
- case Some(stageData: StageUIData) =>
- stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) =>
- <tr>
- <td>
- <div style="float: left">{k}</div>
- <div style="float: right">
- {
- store.executorSummary(k).map(_.executorLogs).getOrElse(Map.empty).map {
- case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div>
- }
- }
- </div>
- </td>
- <td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
- <td sorttable_customkey={v.taskTime.toString}>{UIUtils.formatDuration(v.taskTime)}</td>
- <td>{v.failedTasks + v.succeededTasks + v.reasonToNumKilled.values.sum}</td>
- <td>{v.failedTasks}</td>
- <td>{v.reasonToNumKilled.values.sum}</td>
- <td>{v.succeededTasks}</td>
- {if (stageData.hasInput) {
- <td sorttable_customkey={v.inputBytes.toString}>
- {s"${Utils.bytesToString(v.inputBytes)} / ${v.inputRecords}"}
- </td>
- }}
- {if (stageData.hasOutput) {
- <td sorttable_customkey={v.outputBytes.toString}>
- {s"${Utils.bytesToString(v.outputBytes)} / ${v.outputRecords}"}
- </td>
- }}
- {if (stageData.hasShuffleRead) {
- <td sorttable_customkey={v.shuffleRead.toString}>
- {s"${Utils.bytesToString(v.shuffleRead)} / ${v.shuffleReadRecords}"}
- </td>
- }}
- {if (stageData.hasShuffleWrite) {
- <td sorttable_customkey={v.shuffleWrite.toString}>
- {s"${Utils.bytesToString(v.shuffleWrite)} / ${v.shuffleWriteRecords}"}
- </td>
- }}
- {if (stageData.hasBytesSpilled) {
- <td sorttable_customkey={v.memoryBytesSpilled.toString}>
- {Utils.bytesToString(v.memoryBytesSpilled)}
- </td>
- <td sorttable_customkey={v.diskBytesSpilled.toString}>
- {Utils.bytesToString(v.diskBytesSpilled)}
- </td>
- }}
- <td>{v.isBlacklisted}</td>
- </tr>
- }
- case None =>
- Seq.empty[Node]
+ private def createExecutorTable(stage: StageData) : Seq[Node] = {
+ stage.executorSummary.getOrElse(Map.empty).toSeq.sortBy(_._1).map { case (k, v) =>
+ val executor = store.asOption(store.executorSummary(k))
+ <tr>
+ <td>
+ <div style="float: left">{k}</div>
+ <div style="float: right">
+ {
+ executor.map(_.executorLogs).getOrElse(Map.empty).map {
+ case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div>
+ }
+ }
+ </div>
+ </td>
+ <td>{executor.map { e => e.hostPort }.getOrElse("CANNOT FIND ADDRESS")}</td>
+ <td sorttable_customkey={v.taskTime.toString}>{UIUtils.formatDuration(v.taskTime)}</td>
+ <td>{v.failedTasks + v.succeededTasks + v.killedTasks}</td>
+ <td>{v.failedTasks}</td>
+ <td>{v.killedTasks}</td>
+ <td>{v.succeededTasks}</td>
+ {if (hasInput(stage)) {
+ <td sorttable_customkey={v.inputBytes.toString}>
+ {s"${Utils.bytesToString(v.inputBytes)} / ${v.inputRecords}"}
+ </td>
+ }}
+ {if (hasOutput(stage)) {
+ <td sorttable_customkey={v.outputBytes.toString}>
+ {s"${Utils.bytesToString(v.outputBytes)} / ${v.outputRecords}"}
+ </td>
+ }}
+ {if (hasShuffleRead(stage)) {
+ <td sorttable_customkey={v.shuffleRead.toString}>
+ {s"${Utils.bytesToString(v.shuffleRead)} / ${v.shuffleReadRecords}"}
+ </td>
+ }}
+ {if (hasShuffleWrite(stage)) {
+ <td sorttable_customkey={v.shuffleWrite.toString}>
+ {s"${Utils.bytesToString(v.shuffleWrite)} / ${v.shuffleWriteRecords}"}
+ </td>
+ }}
+ {if (hasBytesSpilled(stage)) {
+ <td sorttable_customkey={v.memoryBytesSpilled.toString}>
+ {Utils.bytesToString(v.memoryBytesSpilled)}
+ </td>
+ <td sorttable_customkey={v.diskBytesSpilled.toString}>
+ {Utils.bytesToString(v.diskBytesSpilled)}
+ </td>
+ }}
+ <td>{executor.map(_.isBlacklisted).getOrElse(false)}</td>
+ </tr>
}
}
+
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index 7ed0164..740f12e7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -17,7 +17,7 @@
package org.apache.spark.ui.jobs
-import java.util.{Date, Locale}
+import java.util.Locale
import javax.servlet.http.HttpServletRequest
import scala.collection.mutable.{Buffer, ListBuffer}
@@ -27,11 +27,12 @@ import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.JobExecutionStatus
import org.apache.spark.scheduler._
-import org.apache.spark.status.api.v1.ExecutorSummary
-import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
+import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status.api.v1
+import org.apache.spark.ui._
/** Page showing statistics and stage list for a given job */
-private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
+private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIPage("job") {
private val STAGES_LEGEND =
<div class="legend-area"><svg width="150px" height="85px">
@@ -56,14 +57,15 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
<text x="35px" y="42px">Removed</text>
</svg></div>.toString.filter(_ != '\n')
- private def makeStageEvent(stageInfos: Seq[StageInfo]): Seq[String] = {
+ private def makeStageEvent(stageInfos: Seq[v1.StageData]): Seq[String] = {
stageInfos.map { stage =>
val stageId = stage.stageId
val attemptId = stage.attemptId
val name = stage.name
- val status = stage.getStatusString
- val submissionTime = stage.submissionTime.get
- val completionTime = stage.completionTime.getOrElse(System.currentTimeMillis())
+ val status = stage.status.toString
+ val submissionTime = stage.submissionTime.get.getTime()
+ val completionTime = stage.completionTime.map(_.getTime())
+ .getOrElse(System.currentTimeMillis())
// The timeline library treats contents as HTML, so we have to escape them. We need to add
// extra layers of escaping in order to embed this in a Javascript string literal.
@@ -79,10 +81,10 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
| 'data-placement="top" data-html="true"' +
| 'data-title="${jsEscapedName} (Stage ${stageId}.${attemptId})<br>' +
| 'Status: ${status.toUpperCase(Locale.ROOT)}<br>' +
- | 'Submitted: ${UIUtils.formatDate(new Date(submissionTime))}' +
+ | 'Submitted: ${UIUtils.formatDate(submissionTime)}' +
| '${
if (status != "running") {
- s"""<br>Completed: ${UIUtils.formatDate(new Date(completionTime))}"""
+ s"""<br>Completed: ${UIUtils.formatDate(completionTime)}"""
} else {
""
}
@@ -93,7 +95,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
}
}
- def makeExecutorEvent(executors: Seq[ExecutorSummary]): Seq[String] = {
+ def makeExecutorEvent(executors: Seq[v1.ExecutorSummary]): Seq[String] = {
val events = ListBuffer[String]()
executors.foreach { e =>
val addedEvent =
@@ -137,8 +139,8 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
}
private def makeTimeline(
- stages: Seq[StageInfo],
- executors: Seq[ExecutorSummary],
+ stages: Seq[v1.StageData],
+ executors: Seq[v1.ExecutorSummary],
appStartTime: Long): Seq[Node] = {
val stageEventJsonAsStrSeq = makeStageEvent(stages)
@@ -182,173 +184,181 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
}
def render(request: HttpServletRequest): Seq[Node] = {
- val listener = parent.jobProgresslistener
+ // stripXSS is called first to remove suspicious characters used in XSS attacks
+ val parameterId = UIUtils.stripXSS(request.getParameter("id"))
+ require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
- listener.synchronized {
- // stripXSS is called first to remove suspicious characters used in XSS attacks
- val parameterId = UIUtils.stripXSS(request.getParameter("id"))
- require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
-
- val jobId = parameterId.toInt
- val jobDataOption = listener.jobIdToData.get(jobId)
- if (jobDataOption.isEmpty) {
- val content =
- <div id="no-info">
- <p>No information to display for job {jobId}</p>
- </div>
- return UIUtils.headerSparkPage(
- s"Details for Job $jobId", content, parent)
- }
- val jobData = jobDataOption.get
- val isComplete = jobData.status != JobExecutionStatus.RUNNING
- val stages = jobData.stageIds.map { stageId =>
- // This could be empty if the JobProgressListener hasn't received information about the
- // stage or if the stage information has been garbage collected
- listener.stageIdToInfo.getOrElse(stageId,
- new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown"))
+ val jobId = parameterId.toInt
+ val jobData = store.asOption(store.job(jobId)).getOrElse {
+ val content =
+ <div id="no-info">
+ <p>No information to display for job {jobId}</p>
+ </div>
+ return UIUtils.headerSparkPage(
+ s"Details for Job $jobId", content, parent)
+ }
+ val isComplete = jobData.status != JobExecutionStatus.RUNNING
+ val stages = jobData.stageIds.map { stageId =>
+ // This could be empty if the listener hasn't received information about the
+ // stage or if the stage information has been garbage collected
+ store.stageData(stageId).lastOption.getOrElse {
+ new v1.StageData(
+ v1.StageStatus.PENDING,
+ stageId,
+ 0, 0, 0, 0, 0, 0, 0,
+ 0L, 0L, None, None, None, None,
+ 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L,
+ "Unknown",
+ None,
+ "Unknown",
+ null,
+ Nil,
+ Nil,
+ None,
+ None,
+ Map())
}
+ }
- val activeStages = Buffer[StageInfo]()
- val completedStages = Buffer[StageInfo]()
- // If the job is completed, then any pending stages are displayed as "skipped":
- val pendingOrSkippedStages = Buffer[StageInfo]()
- val failedStages = Buffer[StageInfo]()
- for (stage <- stages) {
- if (stage.submissionTime.isEmpty) {
- pendingOrSkippedStages += stage
- } else if (stage.completionTime.isDefined) {
- if (stage.failureReason.isDefined) {
- failedStages += stage
- } else {
- completedStages += stage
- }
+ val activeStages = Buffer[v1.StageData]()
+ val completedStages = Buffer[v1.StageData]()
+ // If the job is completed, then any pending stages are displayed as "skipped":
+ val pendingOrSkippedStages = Buffer[v1.StageData]()
+ val failedStages = Buffer[v1.StageData]()
+ for (stage <- stages) {
+ if (stage.submissionTime.isEmpty) {
+ pendingOrSkippedStages += stage
+ } else if (stage.completionTime.isDefined) {
+ if (stage.status == v1.StageStatus.FAILED) {
+ failedStages += stage
} else {
- activeStages += stage
+ completedStages += stage
}
+ } else {
+ activeStages += stage
}
+ }
- val basePath = "jobs/job"
+ val basePath = "jobs/job"
- val pendingOrSkippedTableId =
- if (isComplete) {
- "pending"
- } else {
- "skipped"
- }
+ val pendingOrSkippedTableId =
+ if (isComplete) {
+ "pending"
+ } else {
+ "skipped"
+ }
- val activeStagesTable =
- new StageTableBase(request, activeStages, "active", "activeStage", parent.basePath,
- basePath, parent.jobProgresslistener, parent.isFairScheduler,
- killEnabled = parent.killEnabled, isFailedStage = false)
- val pendingOrSkippedStagesTable =
- new StageTableBase(request, pendingOrSkippedStages, pendingOrSkippedTableId, "pendingStage",
- parent.basePath, basePath, parent.jobProgresslistener, parent.isFairScheduler,
- killEnabled = false, isFailedStage = false)
- val completedStagesTable =
- new StageTableBase(request, completedStages, "completed", "completedStage", parent.basePath,
- basePath, parent.jobProgresslistener, parent.isFairScheduler,
- killEnabled = false, isFailedStage = false)
- val failedStagesTable =
- new StageTableBase(request, failedStages, "failed", "failedStage", parent.basePath,
- basePath, parent.jobProgresslistener, parent.isFairScheduler,
- killEnabled = false, isFailedStage = true)
+ val activeStagesTable =
+ new StageTableBase(store, request, activeStages, "active", "activeStage", parent.basePath,
+ basePath, parent.isFairScheduler,
+ killEnabled = parent.killEnabled, isFailedStage = false)
+ val pendingOrSkippedStagesTable =
+ new StageTableBase(store, request, pendingOrSkippedStages, pendingOrSkippedTableId,
+ "pendingStage", parent.basePath, basePath, parent.isFairScheduler,
+ killEnabled = false, isFailedStage = false)
+ val completedStagesTable =
+ new StageTableBase(store, request, completedStages, "completed", "completedStage",
+ parent.basePath, basePath, parent.isFairScheduler,
+ killEnabled = false, isFailedStage = false)
+ val failedStagesTable =
+ new StageTableBase(store, request, failedStages, "failed", "failedStage", parent.basePath,
+ basePath, parent.isFairScheduler,
+ killEnabled = false, isFailedStage = true)
- val shouldShowActiveStages = activeStages.nonEmpty
- val shouldShowPendingStages = !isComplete && pendingOrSkippedStages.nonEmpty
- val shouldShowCompletedStages = completedStages.nonEmpty
- val shouldShowSkippedStages = isComplete && pendingOrSkippedStages.nonEmpty
- val shouldShowFailedStages = failedStages.nonEmpty
+ val shouldShowActiveStages = activeStages.nonEmpty
+ val shouldShowPendingStages = !isComplete && pendingOrSkippedStages.nonEmpty
+ val shouldShowCompletedStages = completedStages.nonEmpty
+ val shouldShowSkippedStages = isComplete && pendingOrSkippedStages.nonEmpty
+ val shouldShowFailedStages = failedStages.nonEmpty
- val summary: NodeSeq =
- <div>
- <ul class="unstyled">
- <li>
- <Strong>Status:</Strong>
- {jobData.status}
- </li>
- {
- if (jobData.jobGroup.isDefined) {
- <li>
- <strong>Job Group:</strong>
- {jobData.jobGroup.get}
- </li>
- }
- }
- {
- if (shouldShowActiveStages) {
- <li>
- <a href="#active"><strong>Active Stages:</strong></a>
- {activeStages.size}
- </li>
- }
- }
- {
- if (shouldShowPendingStages) {
- <li>
- <a href="#pending">
- <strong>Pending Stages:</strong>
- </a>{pendingOrSkippedStages.size}
- </li>
- }
+ val summary: NodeSeq =
+ <div>
+ <ul class="unstyled">
+ <li>
+ <Strong>Status:</Strong>
+ {jobData.status}
+ </li>
+ {
+ if (jobData.jobGroup.isDefined) {
+ <li>
+ <strong>Job Group:</strong>
+ {jobData.jobGroup.get}
+ </li>
}
- {
- if (shouldShowCompletedStages) {
- <li>
- <a href="#completed"><strong>Completed Stages:</strong></a>
- {completedStages.size}
- </li>
- }
+ }
+ {
+ if (shouldShowActiveStages) {
+ <li>
+ <a href="#active"><strong>Active Stages:</strong></a>
+ {activeStages.size}
+ </li>
}
- {
- if (shouldShowSkippedStages) {
+ }
+ {
+ if (shouldShowPendingStages) {
<li>
- <a href="#skipped"><strong>Skipped Stages:</strong></a>
- {pendingOrSkippedStages.size}
+ <a href="#pending">
+ <strong>Pending Stages:</strong>
+ </a>{pendingOrSkippedStages.size}
</li>
}
+ }
+ {
+ if (shouldShowCompletedStages) {
+ <li>
+ <a href="#completed"><strong>Completed Stages:</strong></a>
+ {completedStages.size}
+ </li>
}
- {
- if (shouldShowFailedStages) {
- <li>
- <a href="#failed"><strong>Failed Stages:</strong></a>
- {failedStages.size}
- </li>
- }
+ }
+ {
+ if (shouldShowSkippedStages) {
+ <li>
+ <a href="#skipped"><strong>Skipped Stages:</strong></a>
+ {pendingOrSkippedStages.size}
+ </li>
+ }
+ }
+ {
+ if (shouldShowFailedStages) {
+ <li>
+ <a href="#failed"><strong>Failed Stages:</strong></a>
+ {failedStages.size}
+ </li>
}
- </ul>
- </div>
+ }
+ </ul>
+ </div>
- var content = summary
- val appStartTime = listener.startTime
- val operationGraphListener = parent.operationGraphListener
+ var content = summary
+ val appStartTime = store.applicationInfo().attempts.head.startTime.getTime()
- content ++= makeTimeline(activeStages ++ completedStages ++ failedStages,
- parent.parent.store.executorList(false), appStartTime)
+ content ++= makeTimeline(activeStages ++ completedStages ++ failedStages,
+ store.executorList(false), appStartTime)
- content ++= UIUtils.showDagVizForJob(
- jobId, operationGraphListener.getOperationGraphForJob(jobId))
+ content ++= UIUtils.showDagVizForJob(
+ jobId, store.operationGraphForJob(jobId))
- if (shouldShowActiveStages) {
- content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++
- activeStagesTable.toNodeSeq
- }
- if (shouldShowPendingStages) {
- content ++= <h4 id="pending">Pending Stages ({pendingOrSkippedStages.size})</h4> ++
- pendingOrSkippedStagesTable.toNodeSeq
- }
- if (shouldShowCompletedStages) {
- content ++= <h4 id="completed">Completed Stages ({completedStages.size})</h4> ++
- completedStagesTable.toNodeSeq
- }
- if (shouldShowSkippedStages) {
- content ++= <h4 id="skipped">Skipped Stages ({pendingOrSkippedStages.size})</h4> ++
- pendingOrSkippedStagesTable.toNodeSeq
- }
- if (shouldShowFailedStages) {
- content ++= <h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++
- failedStagesTable.toNodeSeq
- }
- UIUtils.headerSparkPage(s"Details for Job $jobId", content, parent, showVisualization = true)
+ if (shouldShowActiveStages) {
+ content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++
+ activeStagesTable.toNodeSeq
+ }
+ if (shouldShowPendingStages) {
+ content ++= <h4 id="pending">Pending Stages ({pendingOrSkippedStages.size})</h4> ++
+ pendingOrSkippedStagesTable.toNodeSeq
+ }
+ if (shouldShowCompletedStages) {
+ content ++= <h4 id="completed">Completed Stages ({completedStages.size})</h4> ++
+ completedStagesTable.toNodeSeq
+ }
+ if (shouldShowSkippedStages) {
+ content ++= <h4 id="skipped">Skipped Stages ({pendingOrSkippedStages.size})</h4> ++
+ pendingOrSkippedStagesTable.toNodeSeq
+ }
+ if (shouldShowFailedStages) {
+ content ++= <h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++
+ failedStagesTable.toNodeSeq
}
+ UIUtils.headerSparkPage(s"Details for Job $jobId", content, parent, showVisualization = true)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
index 81ffe04..99eab1b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
@@ -19,35 +19,45 @@ package org.apache.spark.ui.jobs
import javax.servlet.http.HttpServletRequest
+import scala.collection.JavaConverters._
+
+import org.apache.spark.JobExecutionStatus
import org.apache.spark.scheduler.SchedulingMode
-import org.apache.spark.ui.{SparkUI, SparkUITab, UIUtils}
+import org.apache.spark.status.AppStatusStore
+import org.apache.spark.ui._
/** Web UI showing progress status of all jobs in the given SparkContext. */
-private[ui] class JobsTab(val parent: SparkUI) extends SparkUITab(parent, "jobs") {
+private[ui] class JobsTab(parent: SparkUI, store: AppStatusStore)
+ extends SparkUITab(parent, "jobs") {
+
val sc = parent.sc
val killEnabled = parent.killEnabled
- val jobProgresslistener = parent.jobProgressListener
- val operationGraphListener = parent.operationGraphListener
- def isFairScheduler: Boolean =
- jobProgresslistener.schedulingMode == Some(SchedulingMode.FAIR)
+ def isFairScheduler: Boolean = {
+ store.environmentInfo().sparkProperties.toMap
+ .get("spark.scheduler.mode")
+ .map { mode => mode == SchedulingMode.FAIR }
+ .getOrElse(false)
+ }
def getSparkUser: String = parent.getSparkUser
- attachPage(new AllJobsPage(this))
- attachPage(new JobPage(this))
+ attachPage(new AllJobsPage(this, store))
+ attachPage(new JobPage(this, store))
def handleKillRequest(request: HttpServletRequest): Unit = {
if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
// stripXSS is called first to remove suspicious characters used in XSS attacks
val jobId = Option(UIUtils.stripXSS(request.getParameter("id"))).map(_.toInt)
jobId.foreach { id =>
- if (jobProgresslistener.activeJobs.contains(id)) {
- sc.foreach(_.cancelJob(id))
- // Do a quick pause here to give Spark time to kill the job so it shows up as
- // killed after the refresh. Note that this will block the serving thread so the
- // time should be limited in duration.
- Thread.sleep(100)
+ store.asOption(store.job(id)).foreach { job =>
+ if (job.status == JobExecutionStatus.RUNNING) {
+ sc.foreach(_.cancelJob(id))
+ // Do a quick pause here to give Spark time to kill the job so it shows up as
+ // killed after the refresh. Note that this will block the serving thread so the
+ // time should be limited in duration.
+ Thread.sleep(100)
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index 4b8c7b2..98fbd7a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -21,46 +21,39 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
-import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.status.PoolData
+import org.apache.spark.status.api.v1._
import org.apache.spark.ui.{UIUtils, WebUIPage}
/** Page showing specific pool details */
private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {
- private val sc = parent.sc
- private val listener = parent.progressListener
def render(request: HttpServletRequest): Seq[Node] = {
- listener.synchronized {
- // stripXSS is called first to remove suspicious characters used in XSS attacks
- val poolName = Option(UIUtils.stripXSS(request.getParameter("poolname"))).map { poolname =>
- UIUtils.decodeURLParameter(poolname)
- }.getOrElse {
- throw new IllegalArgumentException(s"Missing poolname parameter")
- }
-
- val poolToActiveStages = listener.poolToActiveStages
- val activeStages = poolToActiveStages.get(poolName) match {
- case Some(s) => s.values.toSeq
- case None => Seq.empty[StageInfo]
- }
- val shouldShowActiveStages = activeStages.nonEmpty
- val activeStagesTable =
- new StageTableBase(request, activeStages, "", "activeStage", parent.basePath, "stages/pool",
- parent.progressListener, parent.isFairScheduler, parent.killEnabled,
- isFailedStage = false)
-
- // For now, pool information is only accessible in live UIs
- val pools = sc.map(_.getPoolForName(poolName).getOrElse {
- throw new IllegalArgumentException(s"Unknown poolname: $poolName")
- }).toSeq
- val poolTable = new PoolTable(pools, parent)
+ // stripXSS is called first to remove suspicious characters used in XSS attacks
+ val poolName = Option(UIUtils.stripXSS(request.getParameter("poolname"))).map { poolname =>
+ UIUtils.decodeURLParameter(poolname)
+ }.getOrElse {
+ throw new IllegalArgumentException(s"Missing poolname parameter")
+ }
- var content = <h4>Summary </h4> ++ poolTable.toNodeSeq
- if (shouldShowActiveStages) {
- content ++= <h4>Active Stages ({activeStages.size})</h4> ++ activeStagesTable.toNodeSeq
- }
+ // For now, pool information is only accessible in live UIs
+ val pool = parent.sc.flatMap(_.getPoolForName(poolName)).getOrElse {
+ throw new IllegalArgumentException(s"Unknown pool: $poolName")
+ }
- UIUtils.headerSparkPage("Fair Scheduler Pool: " + poolName, content, parent)
+ val uiPool = parent.store.asOption(parent.store.pool(poolName)).getOrElse(
+ new PoolData(poolName, Set()))
+ val activeStages = uiPool.stageIds.toSeq.map(parent.store.lastStageAttempt(_))
+ val activeStagesTable =
+ new StageTableBase(parent.store, request, activeStages, "", "activeStage", parent.basePath,
+ "stages/pool", parent.isFairScheduler, parent.killEnabled, false)
+
+ val poolTable = new PoolTable(Map(pool -> uiPool), parent)
+ var content = <h4>Summary </h4> ++ poolTable.toNodeSeq
+ if (activeStages.nonEmpty) {
+ content ++= <h4>Active Stages ({activeStages.size})</h4> ++ activeStagesTable.toNodeSeq
}
+
+ UIUtils.headerSparkPage("Fair Scheduler Pool: " + poolName, content, parent)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index ea02968..5dfce85 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -19,25 +19,16 @@ package org.apache.spark.ui.jobs
import java.net.URLEncoder
-import scala.collection.mutable.HashMap
import scala.xml.Node
-import org.apache.spark.scheduler.{Schedulable, StageInfo}
+import org.apache.spark.scheduler.Schedulable
+import org.apache.spark.status.PoolData
import org.apache.spark.ui.UIUtils
/** Table showing list of pools */
-private[ui] class PoolTable(pools: Seq[Schedulable], parent: StagesTab) {
- private val listener = parent.progressListener
+private[ui] class PoolTable(pools: Map[Schedulable, PoolData], parent: StagesTab) {
def toNodeSeq: Seq[Node] = {
- listener.synchronized {
- poolTable(poolRow, pools)
- }
- }
-
- private def poolTable(
- makeRow: (Schedulable, HashMap[String, HashMap[Int, StageInfo]]) => Seq[Node],
- rows: Seq[Schedulable]): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed sortable table-fixed">
<thead>
<th>Pool Name</th>
@@ -48,29 +39,24 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: StagesTab) {
<th>SchedulingMode</th>
</thead>
<tbody>
- {rows.map(r => makeRow(r, listener.poolToActiveStages))}
+ {pools.map { case (s, p) => poolRow(s, p) }}
</tbody>
</table>
}
- private def poolRow(
- p: Schedulable,
- poolToActiveStages: HashMap[String, HashMap[Int, StageInfo]]): Seq[Node] = {
- val activeStages = poolToActiveStages.get(p.name) match {
- case Some(stages) => stages.size
- case None => 0
- }
+ private def poolRow(s: Schedulable, p: PoolData): Seq[Node] = {
+ val activeStages = p.stageIds.size
val href = "%s/stages/pool?poolname=%s"
.format(UIUtils.prependBaseUri(parent.basePath), URLEncoder.encode(p.name, "UTF-8"))
<tr>
<td>
<a href={href}>{p.name}</a>
</td>
- <td>{p.minShare}</td>
- <td>{p.weight}</td>
+ <td>{s.minShare}</td>
+ <td>{s.weight}</td>
<td>{activeStages}</td>
- <td>{p.runningTasks}</td>
- <td>{p.schedulingMode}</td>
+ <td>{s.runningTasks}</td>
+ <td>{s.schedulingMode}</td>
</tr>
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org