You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/12/23 19:38:47 UTC

[5/9] git commit: spark-898, changes according to review comments

spark-898, changes according to review comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/36060f4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/36060f4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/36060f4f

Branch: refs/heads/master
Commit: 36060f4f50ead2632117bb12e8c5bc1fb4f91f1e
Parents: 8ab8c6a
Author: wangda.tan <wh...@gmail.com>
Authored: Tue Dec 17 17:55:38 2013 +0800
Committer: wangda.tan <wh...@gmail.com>
Committed: Tue Dec 17 17:55:38 2013 +0800

----------------------------------------------------------------------
 .../org/apache/spark/ui/exec/ExecutorsUI.scala  | 39 +++++++++++++++--
 .../apache/spark/ui/jobs/ExecutorSummary.scala  |  3 +-
 .../apache/spark/ui/jobs/ExecutorTable.scala    | 40 +++++++++--------
 .../org/apache/spark/ui/jobs/IndexPage.scala    |  5 +--
 .../spark/ui/jobs/JobProgressListener.scala     | 31 ++++++++------
 .../org/apache/spark/ui/jobs/StagePage.scala    |  3 +-
 .../ui/jobs/JobProgressListenerSuite.scala      | 45 +++++---------------
 7 files changed, 90 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/36060f4f/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index e596690..808bbe8 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -56,7 +56,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
     val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_)
 
     val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
-      "Active tasks", "Failed tasks", "Complete tasks", "Total tasks")
+      "Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Duration", "Shuffle Read",
+      "Shuffle Write")
 
     def execRow(kv: Seq[String]) = {
       <tr>
@@ -73,6 +74,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
         <td>{kv(7)}</td>
         <td>{kv(8)}</td>
         <td>{kv(9)}</td>
+        <td>{Utils.msDurationToString(kv(10).toLong)}</td>
+        <td>{Utils.bytesToString(kv(11).toLong)}</td>
+        <td>{Utils.bytesToString(kv(12).toLong)}</td>
       </tr>
     }
 
@@ -111,6 +115,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
     val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
     val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
     val totalTasks = activeTasks + failedTasks + completedTasks
+    val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
+    val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
+    val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
 
     Seq(
       execId,
@@ -122,7 +129,10 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
       activeTasks.toString,
       failedTasks.toString,
       completedTasks.toString,
-      totalTasks.toString
+      totalTasks.toString,
+      totalDuration.toString,
+      totalShuffleRead.toString,
+      totalShuffleWrite.toString
     )
   }
 
@@ -130,6 +140,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
     val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]()
     val executorToTasksComplete = HashMap[String, Int]()
     val executorToTasksFailed = HashMap[String, Int]()
+    val executorToDuration = HashMap[String, Long]()
+    val executorToShuffleRead = HashMap[String, Long]()
+    val executorToShuffleWrite = HashMap[String, Long]()
 
     override def onTaskStart(taskStart: SparkListenerTaskStart) {
       val eid = taskStart.taskInfo.executorId
@@ -137,9 +150,12 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
       activeTasks += taskStart.taskInfo
     }
 
-    override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+    override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
       val eid = taskEnd.taskInfo.executorId
       val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
+      val newDuration = executorToDuration.getOrElse(eid, 0L) + taskEnd.taskInfo.duration
+      executorToDuration.put(eid, newDuration)
+
       activeTasks -= taskEnd.taskInfo
       val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
         taskEnd.reason match {
@@ -150,6 +166,23 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
             executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
             (None, Option(taskEnd.taskMetrics))
         }
+
+      // update shuffle read/write
+      val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics
+      shuffleRead match {
+        case Some(s) =>
+          val newShuffleRead = executorToShuffleRead.getOrElse(eid, 0L) + s.remoteBytesRead
+          executorToShuffleRead.put(eid, newShuffleRead)
+        case _ => {}
+      }
+      val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics
+      shuffleWrite match {
+        case Some(s) => {
+          val newShuffleWrite = executorToShuffleWrite.getOrElse(eid, 0L) + s.shuffleBytesWritten
+          executorToShuffleWrite.put(eid, newShuffleWrite)
+        }
+        case _ => {}
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/36060f4f/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
index f2ee120..75c0dd2 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
@@ -19,9 +19,8 @@ package org.apache.spark.ui.jobs
 
 private[spark] class ExecutorSummary() {
   var duration : Long = 0
-  var totalTasks : Int = 0
   var failedTasks : Int = 0
-  var succeedTasks : Int = 0
+  var succeededTasks : Int = 0
   var shuffleRead : Long = 0
   var shuffleWrite : Long = 0
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/36060f4f/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index c6823cd..763d5a3 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -17,14 +17,13 @@
 
 package org.apache.spark.ui.jobs
 
-
 import scala.xml.Node
 
 import org.apache.spark.scheduler.SchedulingMode
-
+import org.apache.spark.util.Utils
 
 /** Page showing executor summary */
-private[spark] class ExecutorTable(val parent: JobProgressUI) {
+private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) {
 
   val listener = parent.listener
   val dateFmt = parent.dateFmt
@@ -42,9 +41,9 @@ private[spark] class ExecutorTable(val parent: JobProgressUI) {
       <thead>
         <th>Executor ID</th>
         <th>Duration</th>
-        <th>#Tasks</th>
-        <th>#Failed Tasks</th>
-        <th>#Succeed Tasks</th>
+        <th>Total Tasks</th>
+        <th>Failed Tasks</th>
+        <th>Succeeded Tasks</th>
         <th>Shuffle Read</th>
         <th>Shuffle Write</th>
       </thead>
@@ -55,19 +54,24 @@ private[spark] class ExecutorTable(val parent: JobProgressUI) {
   }
 
   private def createExecutorTable() : Seq[Node] = {
-    val executorIdToSummary = listener.executorIdToSummary
-    executorIdToSummary.toSeq.sortBy(_._1).map{
-      case (k,v) => {
-      <tr>
-        <td>{k}</td>
-        <td>{v.duration} ms</td>
-        <td>{v.totalTasks}</td>
-        <td>{v.failedTasks}</td>
-        <td>{v.succeedTasks}</td>
-        <td>{v.shuffleRead}</td>
-        <td>{v.shuffleWrite}</td>
-      </tr>
+    val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId)
+    executorIdToSummary match {
+      case Some(x) => {
+        x.toSeq.sortBy(_._1).map{
+          case (k,v) => {
+            <tr>
+              <td>{k}</td>
+              <td>{parent.formatDuration(v.duration)}</td>
+              <td>{v.failedTasks + v.succeededTasks}</td>
+              <td>{v.failedTasks}</td>
+              <td>{v.succeededTasks}</td>
+              <td>{Utils.bytesToString(v.shuffleRead)}</td>
+              <td>{Utils.bytesToString(v.shuffleWrite)}</td>
+            </tr>
+          }
+        }
       }
+      case _ => { Seq[Node]() }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/36060f4f/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
index 653a84b..854afb6 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
@@ -45,7 +45,6 @@ private[spark] class IndexPage(parent: JobProgressUI) {
       val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
       val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
       val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
-      val executorTable = new ExecutorTable(parent)
 
       val pools = listener.sc.getAllPools
       val poolTable = new PoolTable(pools, listener)
@@ -59,7 +58,7 @@ private[spark] class IndexPage(parent: JobProgressUI) {
            <li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li>
            <li>
              <a href="#executors"><strong>Executor Summary:</strong></a>
-             {listener.executorIdToSummary.size}
+             {listener.stageIdToExecutorSummaries.size}
            </li>
            <li>
              <a href="#active"><strong>Active Stages:</strong></a>
@@ -82,8 +81,6 @@ private[spark] class IndexPage(parent: JobProgressUI) {
         } else {
           Seq()
         }} ++
-        <h4 id="executor">Executor Summary</h4> ++
-        executorTable.toNodeSeq++
         <h4 id="active">Active Stages ({activeStages.size})</h4> ++
         activeStagesTable.toNodeSeq++
         <h4 id="completed">Completed Stages ({completedStages.size})</h4> ++

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/36060f4f/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 2635478..8c92ff1 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -57,7 +57,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
   val stageIdToTasksFailed = HashMap[Int, Int]()
   val stageIdToTaskInfos =
     HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
-  val executorIdToSummary = HashMap[String, ExecutorSummary]()
+  val stageIdToExecutorSummaries = HashMap[Int, HashMap[String, ExecutorSummary]]()
 
   override def onJobStart(jobStart: SparkListenerJobStart) {}
 
@@ -115,9 +115,6 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
       sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
     taskList += ((taskStart.taskInfo, None, None))
     stageIdToTaskInfos(sid) = taskList
-    val executorSummary = executorIdToSummary.getOrElseUpdate(key = taskStart.taskInfo.executorId,
-      op = new ExecutorSummary())
-    executorSummary.totalTasks += 1
   }
 
   override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult)
@@ -127,32 +124,39 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
   }
 
   override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
-    // update executor summary
-    val executorSummary = executorIdToSummary.get(taskEnd.taskInfo.executorId)
+    val sid = taskEnd.task.stageId
+
+    // create executor summary map if necessary
+    val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid,
+      op = new HashMap[String, ExecutorSummary]())
+    executorSummaryMap.getOrElseUpdate(key = taskEnd.taskInfo.executorId,
+      op = new ExecutorSummary())
+
+    val executorSummary = executorSummaryMap.get(taskEnd.taskInfo.executorId)
     executorSummary match {
-      case Some(x) => {
+      case Some(y) => {
         // first update failed-task, succeed-task
         taskEnd.reason match {
-          case e: ExceptionFailure =>
-            x.failedTasks += 1
+          case Success =>
+            y.succeededTasks += 1
           case _ =>
-            x.succeedTasks += 1
+            y.failedTasks += 1
         }
 
         // update duration
-        x.duration += taskEnd.taskInfo.duration
+        y.duration += taskEnd.taskInfo.duration
 
         // update shuffle read/write
         val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics
         shuffleRead match {
           case Some(s) =>
-            x.shuffleRead += s.remoteBytesRead
+            y.shuffleRead += s.remoteBytesRead
           case _ => {}
         }
         val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics
         shuffleWrite match {
           case Some(s) => {
-            x.shuffleWrite += s.shuffleBytesWritten
+            y.shuffleWrite += s.shuffleBytesWritten
           }
           case _ => {}
         }
@@ -160,7 +164,6 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
       case _ => {}
     }
 
-    val sid = taskEnd.task.stageId
     val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
     tasksActive -= taskEnd.taskInfo
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/36060f4f/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 69f9446..c077613 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -160,9 +160,10 @@ private[spark] class StagePage(parent: JobProgressUI) {
           def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
           Some(listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true))
         }
-
+      val executorTable = new ExecutorTable(parent, stageId)
       val content =
         summary ++
+        <h4>Summary Metrics for Executors</h4> ++ executorTable.toNodeSeq() ++
         <h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
         <div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
         <h4>Tasks</h4> ++ taskTable

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/36060f4f/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 861d37a..67a57a0 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -19,26 +19,19 @@ package org.apache.spark.ui.jobs
 
 import org.scalatest.FunSuite
 import org.apache.spark.scheduler._
-import org.apache.spark.SparkContext
-import org.apache.spark.Success
+import org.apache.spark.{LocalSparkContext, SparkContext, Success}
 import org.apache.spark.scheduler.SparkListenerTaskStart
 import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
 
-class JobProgressListenerSuite extends FunSuite {
+class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
   test("test executor id to summary") {
-    val sc = new SparkContext("local", "joblogger")
+    val sc = new SparkContext("local", "test")
     val listener = new JobProgressListener(sc)
     val taskMetrics = new TaskMetrics()
     val shuffleReadMetrics = new ShuffleReadMetrics()
 
     // nothing in it
-    assert(listener.executorIdToSummary.size == 0)
-
-    // launched a task, should get an item in map
-    listener.onTaskStart(new SparkListenerTaskStart(
-      new ShuffleMapTask(0, null, null, 0, null),
-      new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)))
-    assert(listener.executorIdToSummary.size == 1)
+    assert(listener.stageIdToExecutorSummaries.size == 0)
 
     // finish this task, should get updated shuffleRead
     shuffleReadMetrics.remoteBytesRead = 1000
@@ -47,20 +40,15 @@ class JobProgressListenerSuite extends FunSuite {
     taskInfo.finishTime = 1
     listener.onTaskEnd(new SparkListenerTaskEnd(
       new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
-    assert(listener.executorIdToSummary.getOrElse("exe-1", fail()).shuffleRead == 1000)
+    assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
+      .shuffleRead == 1000)
 
     // finish a task with unknown executor-id, nothing should happen
     taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL)
     taskInfo.finishTime = 1
     listener.onTaskEnd(new SparkListenerTaskEnd(
       new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
-    assert(listener.executorIdToSummary.size == 1)
-
-    // launched a task
-    listener.onTaskStart(new SparkListenerTaskStart(
-      new ShuffleMapTask(0, null, null, 0, null),
-      new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)))
-    assert(listener.executorIdToSummary.size == 1)
+    assert(listener.stageIdToExecutorSummaries.size == 1)
 
     // finish this task, should get updated duration
     shuffleReadMetrics.remoteBytesRead = 1000
@@ -69,13 +57,8 @@ class JobProgressListenerSuite extends FunSuite {
     taskInfo.finishTime = 1
     listener.onTaskEnd(new SparkListenerTaskEnd(
       new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
-    assert(listener.executorIdToSummary.getOrElse("exe-1", fail()).shuffleRead == 2000)
-
-    // launched a task in another exec
-    listener.onTaskStart(new SparkListenerTaskStart(
-      new ShuffleMapTask(0, null, null, 0, null),
-      new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL)))
-    assert(listener.executorIdToSummary.size == 2)
+    assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
+      .shuffleRead == 2000)
 
     // finish this task, should get updated duration
     shuffleReadMetrics.remoteBytesRead = 1000
@@ -84,13 +67,7 @@ class JobProgressListenerSuite extends FunSuite {
     taskInfo.finishTime = 1
     listener.onTaskEnd(new SparkListenerTaskEnd(
       new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
-    assert(listener.executorIdToSummary.getOrElse("exe-2", fail()).shuffleRead == 1000)
-
-    // do finalize
-    sc.stop()
-
-    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
-    System.clearProperty("spark.driver.port")
-    System.clearProperty("spark.hostPort")
+    assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail())
+      .shuffleRead == 1000)
   }
 }