You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/12/23 19:38:49 UTC

[7/9] git commit: added changes according to comments from rxin

added changes according to comments from rxin


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c979eecd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c979eecd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c979eecd

Branch: refs/heads/master
Commit: c979eecdf6a11462595aba9d5b8fc942682cf85d
Parents: 59e53fa
Author: wangda.tan <wh...@gmail.com>
Authored: Sun Dec 22 21:43:15 2013 +0800
Committer: wangda.tan <wh...@gmail.com>
Committed: Sun Dec 22 21:43:15 2013 +0800

----------------------------------------------------------------------
 .../org/apache/spark/ui/exec/ExecutorsUI.scala  | 24 +++++++------------
 .../apache/spark/ui/jobs/ExecutorSummary.scala  |  5 ++--
 .../apache/spark/ui/jobs/ExecutorTable.scala    |  4 ++--
 .../org/apache/spark/ui/jobs/IndexPage.scala    |  4 ----
 .../spark/ui/jobs/JobProgressListener.scala     | 25 +++++++-------------
 .../org/apache/spark/ui/jobs/StagePage.scala    |  4 ++--
 .../org/apache/spark/ui/jobs/StageTable.scala   |  2 +-
 7 files changed, 24 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c979eecd/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index f62ae37..a31a7e1 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -56,7 +56,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
     val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_)
 
     val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
-      "Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Duration", "Shuffle Read",
+      "Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Task Time", "Shuffle Read",
       "Shuffle Write")
 
     def execRow(kv: Seq[String]) = {
@@ -169,21 +169,13 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
 
       // update shuffle read/write
       if (null != taskEnd.taskMetrics) {
-        val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics
-        shuffleRead match {
-          case Some(s) =>
-            val newShuffleRead = executorToShuffleRead.getOrElse(eid, 0L) + s.remoteBytesRead
-            executorToShuffleRead.put(eid, newShuffleRead)
-          case _ => {}
-        }
-        val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics
-        shuffleWrite match {
-          case Some(s) => {
-            val newShuffleWrite = executorToShuffleWrite.getOrElse(eid, 0L) + s.shuffleBytesWritten
-            executorToShuffleWrite.put(eid, newShuffleWrite)
-          }
-          case _ => {}
-        }
+        taskEnd.taskMetrics.shuffleReadMetrics.foreach(shuffleRead =>
+          executorToShuffleRead.put(eid, executorToShuffleRead.getOrElse(eid, 0L) +
+            shuffleRead.remoteBytesRead))
+
+        taskEnd.taskMetrics.shuffleWriteMetrics.foreach(shuffleWrite =>
+          executorToShuffleWrite.put(eid, executorToShuffleWrite.getOrElse(eid, 0L) +
+            shuffleWrite.shuffleBytesWritten))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c979eecd/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
index 75c0dd2..3c53e88 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
@@ -17,8 +17,9 @@
 
 package org.apache.spark.ui.jobs
 
-private[spark] class ExecutorSummary() {
-  var duration : Long = 0
+/** class for reporting aggregated metrics for each executors in stageUI */
+private[spark] class ExecutorSummary {
+  var taskTime : Long = 0
   var failedTasks : Int = 0
   var succeededTasks : Int = 0
   var shuffleRead : Long = 0

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c979eecd/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 763d5a3..0e9dd4a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -40,7 +40,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
     <table class="table table-bordered table-striped table-condensed sortable">
       <thead>
         <th>Executor ID</th>
-        <th>Duration</th>
+        <th>Task Time</th>
         <th>Total Tasks</th>
         <th>Failed Tasks</th>
         <th>Succeeded Tasks</th>
@@ -61,7 +61,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
           case (k,v) => {
             <tr>
               <td>{k}</td>
-              <td>{parent.formatDuration(v.duration)}</td>
+              <td>{parent.formatDuration(v.taskTime)}</td>
               <td>{v.failedTasks + v.succeededTasks}</td>
               <td>{v.failedTasks}</td>
               <td>{v.succeededTasks}</td>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c979eecd/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
index 854afb6..ca5a286 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
@@ -57,10 +57,6 @@ private[spark] class IndexPage(parent: JobProgressUI) {
            </li>
            <li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li>
            <li>
-             <a href="#executors"><strong>Executor Summary:</strong></a>
-             {listener.stageIdToExecutorSummaries.size}
-           </li>
-           <li>
              <a href="#active"><strong>Active Stages:</strong></a>
              {activeStages.size}
            </li>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c979eecd/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 64ce715..07a42f0 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -144,23 +144,14 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
         }
 
         // update duration
-        y.duration += taskEnd.taskInfo.duration
-
-        // update shuffle read/write
-        if (null != taskEnd.taskMetrics) {
-          val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics
-          shuffleRead match {
-            case Some(s) =>
-              y.shuffleRead += s.remoteBytesRead
-            case _ => {}
-          }
-          val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics
-          shuffleWrite match {
-            case Some(s) => {
-              y.shuffleWrite += s.shuffleBytesWritten
-            }
-            case _ => {}
-          }
+        y.taskTime += taskEnd.taskInfo.duration
+
+        taskEnd.taskMetrics.shuffleReadMetrics.foreach { shuffleRead =>
+          y.shuffleRead += shuffleRead.remoteBytesRead
+        }
+
+        taskEnd.taskMetrics.shuffleWriteMetrics.foreach { shuffleWrite =>
+          y.shuffleWrite += shuffleWrite.shuffleBytesWritten
         }
       }
       case _ => {}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c979eecd/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index c077613..d8a6c9e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -66,7 +66,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
         <div>
           <ul class="unstyled">
             <li>
-              <strong>Total duration across all tasks: </strong>
+              <strong>Total task time across all tasks: </strong>
               {parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
             </li>
             {if (hasShuffleRead)
@@ -163,9 +163,9 @@ private[spark] class StagePage(parent: JobProgressUI) {
       val executorTable = new ExecutorTable(parent, stageId)
       val content =
         summary ++
-        <h4>Summary Metrics for Executors</h4> ++ executorTable.toNodeSeq() ++
         <h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
         <div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
+        <h4>Aggregated Metrics by Executors</h4> ++ executorTable.toNodeSeq() ++
         <h4>Tasks</h4> ++ taskTable
 
       headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c979eecd/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 9ad6de3..463d85d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -48,7 +48,7 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr
         {if (isFairScheduler) {<th>Pool Name</th>} else {}}
         <th>Description</th>
         <th>Submitted</th>
-        <th>Duration</th>
+        <th>Task Time</th>
         <th>Tasks: Succeeded/Total</th>
         <th>Shuffle Read</th>
         <th>Shuffle Write</th>