You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/12/23 19:38:47 UTC
[5/9] git commit: spark-898, changes according to review comments
spark-898, changes according to review comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/36060f4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/36060f4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/36060f4f
Branch: refs/heads/master
Commit: 36060f4f50ead2632117bb12e8c5bc1fb4f91f1e
Parents: 8ab8c6a
Author: wangda.tan <wh...@gmail.com>
Authored: Tue Dec 17 17:55:38 2013 +0800
Committer: wangda.tan <wh...@gmail.com>
Committed: Tue Dec 17 17:55:38 2013 +0800
----------------------------------------------------------------------
.../org/apache/spark/ui/exec/ExecutorsUI.scala | 39 +++++++++++++++--
.../apache/spark/ui/jobs/ExecutorSummary.scala | 3 +-
.../apache/spark/ui/jobs/ExecutorTable.scala | 40 +++++++++--------
.../org/apache/spark/ui/jobs/IndexPage.scala | 5 +--
.../spark/ui/jobs/JobProgressListener.scala | 31 ++++++++------
.../org/apache/spark/ui/jobs/StagePage.scala | 3 +-
.../ui/jobs/JobProgressListenerSuite.scala | 45 +++++---------------
7 files changed, 90 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/36060f4f/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index e596690..808bbe8 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -56,7 +56,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_)
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
- "Active tasks", "Failed tasks", "Complete tasks", "Total tasks")
+ "Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Duration", "Shuffle Read",
+ "Shuffle Write")
def execRow(kv: Seq[String]) = {
<tr>
@@ -73,6 +74,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
<td>{kv(7)}</td>
<td>{kv(8)}</td>
<td>{kv(9)}</td>
+ <td>{Utils.msDurationToString(kv(10).toLong)}</td>
+ <td>{Utils.bytesToString(kv(11).toLong)}</td>
+ <td>{Utils.bytesToString(kv(12).toLong)}</td>
</tr>
}
@@ -111,6 +115,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
val totalTasks = activeTasks + failedTasks + completedTasks
+ val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
+ val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
+ val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
Seq(
execId,
@@ -122,7 +129,10 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
activeTasks.toString,
failedTasks.toString,
completedTasks.toString,
- totalTasks.toString
+ totalTasks.toString,
+ totalDuration.toString,
+ totalShuffleRead.toString,
+ totalShuffleWrite.toString
)
}
@@ -130,6 +140,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
+ val executorToDuration = HashMap[String, Long]()
+ val executorToShuffleRead = HashMap[String, Long]()
+ val executorToShuffleWrite = HashMap[String, Long]()
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val eid = taskStart.taskInfo.executorId
@@ -137,9 +150,12 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
activeTasks += taskStart.taskInfo
}
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val eid = taskEnd.taskInfo.executorId
val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
+ val newDuration = executorToDuration.getOrElse(eid, 0L) + taskEnd.taskInfo.duration
+ executorToDuration.put(eid, newDuration)
+
activeTasks -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
@@ -150,6 +166,23 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
(None, Option(taskEnd.taskMetrics))
}
+
+ // update shuffle read/write
+ val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics
+ shuffleRead match {
+ case Some(s) =>
+ val newShuffleRead = executorToShuffleRead.getOrElse(eid, 0L) + s.remoteBytesRead
+ executorToShuffleRead.put(eid, newShuffleRead)
+ case _ => {}
+ }
+ val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics
+ shuffleWrite match {
+ case Some(s) => {
+ val newShuffleWrite = executorToShuffleWrite.getOrElse(eid, 0L) + s.shuffleBytesWritten
+ executorToShuffleWrite.put(eid, newShuffleWrite)
+ }
+ case _ => {}
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/36060f4f/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
index f2ee120..75c0dd2 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
@@ -19,9 +19,8 @@ package org.apache.spark.ui.jobs
private[spark] class ExecutorSummary() {
var duration : Long = 0
- var totalTasks : Int = 0
var failedTasks : Int = 0
- var succeedTasks : Int = 0
+ var succeededTasks : Int = 0
var shuffleRead : Long = 0
var shuffleWrite : Long = 0
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/36060f4f/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index c6823cd..763d5a3 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -17,14 +17,13 @@
package org.apache.spark.ui.jobs
-
import scala.xml.Node
import org.apache.spark.scheduler.SchedulingMode
-
+import org.apache.spark.util.Utils
/** Page showing executor summary */
-private[spark] class ExecutorTable(val parent: JobProgressUI) {
+private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) {
val listener = parent.listener
val dateFmt = parent.dateFmt
@@ -42,9 +41,9 @@ private[spark] class ExecutorTable(val parent: JobProgressUI) {
<thead>
<th>Executor ID</th>
<th>Duration</th>
- <th>#Tasks</th>
- <th>#Failed Tasks</th>
- <th>#Succeed Tasks</th>
+ <th>Total Tasks</th>
+ <th>Failed Tasks</th>
+ <th>Succeeded Tasks</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>
</thead>
@@ -55,19 +54,24 @@ private[spark] class ExecutorTable(val parent: JobProgressUI) {
}
private def createExecutorTable() : Seq[Node] = {
- val executorIdToSummary = listener.executorIdToSummary
- executorIdToSummary.toSeq.sortBy(_._1).map{
- case (k,v) => {
- <tr>
- <td>{k}</td>
- <td>{v.duration} ms</td>
- <td>{v.totalTasks}</td>
- <td>{v.failedTasks}</td>
- <td>{v.succeedTasks}</td>
- <td>{v.shuffleRead}</td>
- <td>{v.shuffleWrite}</td>
- </tr>
+ val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId)
+ executorIdToSummary match {
+ case Some(x) => {
+ x.toSeq.sortBy(_._1).map{
+ case (k,v) => {
+ <tr>
+ <td>{k}</td>
+ <td>{parent.formatDuration(v.duration)}</td>
+ <td>{v.failedTasks + v.succeededTasks}</td>
+ <td>{v.failedTasks}</td>
+ <td>{v.succeededTasks}</td>
+ <td>{Utils.bytesToString(v.shuffleRead)}</td>
+ <td>{Utils.bytesToString(v.shuffleWrite)}</td>
+ </tr>
+ }
+ }
}
+ case _ => { Seq[Node]() }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/36060f4f/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
index 653a84b..854afb6 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
@@ -45,7 +45,6 @@ private[spark] class IndexPage(parent: JobProgressUI) {
val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
- val executorTable = new ExecutorTable(parent)
val pools = listener.sc.getAllPools
val poolTable = new PoolTable(pools, listener)
@@ -59,7 +58,7 @@ private[spark] class IndexPage(parent: JobProgressUI) {
<li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li>
<li>
<a href="#executors"><strong>Executor Summary:</strong></a>
- {listener.executorIdToSummary.size}
+ {listener.stageIdToExecutorSummaries.size}
</li>
<li>
<a href="#active"><strong>Active Stages:</strong></a>
@@ -82,8 +81,6 @@ private[spark] class IndexPage(parent: JobProgressUI) {
} else {
Seq()
}} ++
- <h4 id="executor">Executor Summary</h4> ++
- executorTable.toNodeSeq++
<h4 id="active">Active Stages ({activeStages.size})</h4> ++
activeStagesTable.toNodeSeq++
<h4 id="completed">Completed Stages ({completedStages.size})</h4> ++
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/36060f4f/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 2635478..8c92ff1 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -57,7 +57,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val stageIdToTasksFailed = HashMap[Int, Int]()
val stageIdToTaskInfos =
HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
- val executorIdToSummary = HashMap[String, ExecutorSummary]()
+ val stageIdToExecutorSummaries = HashMap[Int, HashMap[String, ExecutorSummary]]()
override def onJobStart(jobStart: SparkListenerJobStart) {}
@@ -115,9 +115,6 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskStart.taskInfo, None, None))
stageIdToTaskInfos(sid) = taskList
- val executorSummary = executorIdToSummary.getOrElseUpdate(key = taskStart.taskInfo.executorId,
- op = new ExecutorSummary())
- executorSummary.totalTasks += 1
}
override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult)
@@ -127,32 +124,39 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
- // update executor summary
- val executorSummary = executorIdToSummary.get(taskEnd.taskInfo.executorId)
+ val sid = taskEnd.task.stageId
+
+ // create executor summary map if necessary
+ val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid,
+ op = new HashMap[String, ExecutorSummary]())
+ executorSummaryMap.getOrElseUpdate(key = taskEnd.taskInfo.executorId,
+ op = new ExecutorSummary())
+
+ val executorSummary = executorSummaryMap.get(taskEnd.taskInfo.executorId)
executorSummary match {
- case Some(x) => {
+ case Some(y) => {
// first update failed-task, succeed-task
taskEnd.reason match {
- case e: ExceptionFailure =>
- x.failedTasks += 1
+ case Success =>
+ y.succeededTasks += 1
case _ =>
- x.succeedTasks += 1
+ y.failedTasks += 1
}
// update duration
- x.duration += taskEnd.taskInfo.duration
+ y.duration += taskEnd.taskInfo.duration
// update shuffle read/write
val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics
shuffleRead match {
case Some(s) =>
- x.shuffleRead += s.remoteBytesRead
+ y.shuffleRead += s.remoteBytesRead
case _ => {}
}
val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics
shuffleWrite match {
case Some(s) => {
- x.shuffleWrite += s.shuffleBytesWritten
+ y.shuffleWrite += s.shuffleBytesWritten
}
case _ => {}
}
@@ -160,7 +164,6 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
case _ => {}
}
- val sid = taskEnd.task.stageId
val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
tasksActive -= taskEnd.taskInfo
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/36060f4f/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 69f9446..c077613 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -160,9 +160,10 @@ private[spark] class StagePage(parent: JobProgressUI) {
def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
Some(listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true))
}
-
+ val executorTable = new ExecutorTable(parent, stageId)
val content =
summary ++
+ <h4>Summary Metrics for Executors</h4> ++ executorTable.toNodeSeq() ++
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
<h4>Tasks</h4> ++ taskTable
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/36060f4f/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 861d37a..67a57a0 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -19,26 +19,19 @@ package org.apache.spark.ui.jobs
import org.scalatest.FunSuite
import org.apache.spark.scheduler._
-import org.apache.spark.SparkContext
-import org.apache.spark.Success
+import org.apache.spark.{LocalSparkContext, SparkContext, Success}
import org.apache.spark.scheduler.SparkListenerTaskStart
import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
-class JobProgressListenerSuite extends FunSuite {
+class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
test("test executor id to summary") {
- val sc = new SparkContext("local", "joblogger")
+ val sc = new SparkContext("local", "test")
val listener = new JobProgressListener(sc)
val taskMetrics = new TaskMetrics()
val shuffleReadMetrics = new ShuffleReadMetrics()
// nothing in it
- assert(listener.executorIdToSummary.size == 0)
-
- // launched a task, should get an item in map
- listener.onTaskStart(new SparkListenerTaskStart(
- new ShuffleMapTask(0, null, null, 0, null),
- new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)))
- assert(listener.executorIdToSummary.size == 1)
+ assert(listener.stageIdToExecutorSummaries.size == 0)
// finish this task, should get updated shuffleRead
shuffleReadMetrics.remoteBytesRead = 1000
@@ -47,20 +40,15 @@ class JobProgressListenerSuite extends FunSuite {
taskInfo.finishTime = 1
listener.onTaskEnd(new SparkListenerTaskEnd(
new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
- assert(listener.executorIdToSummary.getOrElse("exe-1", fail()).shuffleRead == 1000)
+ assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
+ .shuffleRead == 1000)
// finish a task with unknown executor-id, nothing should happen
taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL)
taskInfo.finishTime = 1
listener.onTaskEnd(new SparkListenerTaskEnd(
new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
- assert(listener.executorIdToSummary.size == 1)
-
- // launched a task
- listener.onTaskStart(new SparkListenerTaskStart(
- new ShuffleMapTask(0, null, null, 0, null),
- new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)))
- assert(listener.executorIdToSummary.size == 1)
+ assert(listener.stageIdToExecutorSummaries.size == 1)
// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
@@ -69,13 +57,8 @@ class JobProgressListenerSuite extends FunSuite {
taskInfo.finishTime = 1
listener.onTaskEnd(new SparkListenerTaskEnd(
new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
- assert(listener.executorIdToSummary.getOrElse("exe-1", fail()).shuffleRead == 2000)
-
- // launched a task in another exec
- listener.onTaskStart(new SparkListenerTaskStart(
- new ShuffleMapTask(0, null, null, 0, null),
- new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL)))
- assert(listener.executorIdToSummary.size == 2)
+ assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
+ .shuffleRead == 2000)
// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
@@ -84,13 +67,7 @@ class JobProgressListenerSuite extends FunSuite {
taskInfo.finishTime = 1
listener.onTaskEnd(new SparkListenerTaskEnd(
new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
- assert(listener.executorIdToSummary.getOrElse("exe-2", fail()).shuffleRead == 1000)
-
- // do finalize
- sc.stop()
-
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.driver.port")
- System.clearProperty("spark.hostPort")
+ assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail())
+ .shuffleRead == 1000)
}
}