You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/14 07:32:44 UTC

[2/7] git commit: Add number of bytes spilled to Web UI

Add number of bytes spilled to Web UI


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/bb8098f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/bb8098f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/bb8098f2

Branch: refs/heads/master
Commit: bb8098f203e61111faddf2e1a04b03d62037e6c7
Parents: e644715
Author: Andrew Or <an...@gmail.com>
Authored: Fri Jan 10 21:40:55 2014 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Fri Jan 10 21:40:55 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Aggregator.scala     |  8 +++--
 .../main/scala/org/apache/spark/SparkEnv.scala  |  3 ++
 .../org/apache/spark/executor/Executor.scala    |  4 ++-
 .../org/apache/spark/executor/TaskMetrics.scala |  5 +++
 .../org/apache/spark/rdd/CoGroupedRDD.scala     |  4 ++-
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 10 +++---
 .../apache/spark/ui/jobs/ExecutorSummary.scala  |  1 +
 .../apache/spark/ui/jobs/ExecutorTable.scala    |  2 ++
 .../spark/ui/jobs/JobProgressListener.scala     |  7 +++++
 .../org/apache/spark/ui/jobs/StagePage.scala    | 32 +++++++++++++++++---
 .../util/collection/ExternalAppendOnlyMap.scala | 13 +++++++-
 11 files changed, 75 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 8b30cd4..0609b7b 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -32,9 +32,9 @@ case class Aggregator[K, V, C] (
     mergeCombiners: (C, C) => C) {
 
   private val sparkConf = SparkEnv.get.conf
-  private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
+  private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", false)
 
-  def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
+  def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext) : Iterator[(K, C)] = {
     if (!externalSorting) {
       val combiners = new AppendOnlyMap[K,C]
       var kv: Product2[K, V] = null
@@ -53,11 +53,12 @@ case class Aggregator[K, V, C] (
         val (k, v) = iter.next()
         combiners.insert(k, v)
       }
+      combiners.registerBytesSpilled(context.attemptId)
       combiners.iterator
     }
   }
 
-  def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
+  def combineCombinersByKey(iter: Iterator[(K, C)], context: TaskContext) : Iterator[(K, C)] = {
     if (!externalSorting) {
       val combiners = new AppendOnlyMap[K,C]
       var kc: Product2[K, C] = null
@@ -75,6 +76,7 @@ case class Aggregator[K, V, C] (
         val (k, c) = iter.next()
         combiners.insert(k, c)
       }
+      combiners.registerBytesSpilled(context.attemptId)
       combiners.iterator
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 08b592d..6ec0750 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -60,6 +60,9 @@ class SparkEnv private[spark] (
   // All accesses should be manually synchronized
   val shuffleMemoryMap = mutable.HashMap[Long, Long]()
 
+  // A mapping of task ID to number of bytes spilled by that task. This is mainly for book-keeping.
+  val bytesSpilledMap = mutable.HashMap[Long, Long]()
+
   private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
 
   // A general, soft-reference map for metadata needed during HadoopRDD split computation

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a7b2328..d5dae89 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -229,6 +229,7 @@ private[spark] class Executor(
           m.executorRunTime = (taskFinish - taskStart).toInt
           m.jvmGCTime = gcTime - startGCTime
           m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
+          m.bytesSpilled = env.bytesSpilledMap.get(taskId).getOrElse(0)
         }
 
         val accumUpdates = Accumulators.values
@@ -279,11 +280,12 @@ private[spark] class Executor(
           //System.exit(1)
         }
       } finally {
-        // TODO: Unregister shuffle memory only for ShuffleMapTask
+        // TODO: Unregister shuffle memory only for ResultTask
         val shuffleMemoryMap = env.shuffleMemoryMap
         shuffleMemoryMap.synchronized {
           shuffleMemoryMap.remove(Thread.currentThread().getId)
         }
+        env.bytesSpilledMap.remove(taskId)
         runningTasks.remove(taskId)
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index bb1471d..44a1554 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -49,6 +49,11 @@ class TaskMetrics extends Serializable {
   var resultSerializationTime: Long = _
 
   /**
+   * The number of bytes spilled to disk by this task
+   */
+  var bytesSpilled: Long = _
+
+  /**
    * If this task reads from shuffle output, metrics on getting shuffle data will be collected here
    */
   var shuffleReadMetrics: Option[ShuffleReadMetrics] = None

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index a73714a..6df2b3a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -106,7 +106,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
   override val partitioner = Some(part)
 
   override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
-    val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
+
+    val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", false)
     val split = s.asInstanceOf[CoGroupPartition]
     val numRdds = split.deps.size
 
@@ -150,6 +151,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
           map.insert(kv._1, new CoGroupValue(kv._2, depNum))
         }
       }
+      map.registerBytesSpilled(context.attemptId)
       new InterruptibleIterator(context, map.iterator)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 1248409..dd25d0c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -88,20 +88,22 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
     val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
     if (self.partitioner == Some(partitioner)) {
       self.mapPartitionsWithContext((context, iter) => {
-        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
+        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
       }, preservesPartitioning = true)
     } else if (mapSideCombine) {
-      val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
+      val combined = self.mapPartitionsWithContext((context, iter) => {
+        aggregator.combineValuesByKey(iter, context)
+      }, preservesPartitioning = true)
       val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
         .setSerializer(serializerClass)
       partitioned.mapPartitionsWithContext((context, iter) => {
-        new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter))
+        new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
       }, preservesPartitioning = true)
     } else {
       // Don't apply map-side combiner.
       val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
       values.mapPartitionsWithContext((context, iter) => {
-        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
+        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
       }, preservesPartitioning = true)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
index 3c53e88..3f9cc97 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
@@ -24,4 +24,5 @@ private[spark] class ExecutorSummary {
   var succeededTasks : Int = 0
   var shuffleRead : Long = 0
   var shuffleWrite : Long = 0
+  var bytesSpilled : Long = 0
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 0dd8764..2522fba 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -48,6 +48,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
         <th>Succeeded Tasks</th>
         <th>Shuffle Read</th>
         <th>Shuffle Write</th>
+        <th>Bytes Spilled</th>
       </thead>
       <tbody>
         {createExecutorTable()}
@@ -80,6 +81,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
               <td>{v.succeededTasks}</td>
               <td>{Utils.bytesToString(v.shuffleRead)}</td>
               <td>{Utils.bytesToString(v.shuffleWrite)}</td>
+              <td>{Utils.bytesToString(v.bytesSpilled)}</td>
             </tr>
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index bcd2824..f2c8658 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -52,6 +52,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
   val stageIdToTime = HashMap[Int, Long]()
   val stageIdToShuffleRead = HashMap[Int, Long]()
   val stageIdToShuffleWrite = HashMap[Int, Long]()
+  val stageIdToBytesSpilled = HashMap[Int, Long]()
   val stageIdToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
   val stageIdToTasksComplete = HashMap[Int, Int]()
   val stageIdToTasksFailed = HashMap[Int, Int]()
@@ -78,6 +79,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
         stageIdToTime.remove(s.stageId)
         stageIdToShuffleRead.remove(s.stageId)
         stageIdToShuffleWrite.remove(s.stageId)
+        stageIdToBytesSpilled.remove(s.stageId)
         stageIdToTasksActive.remove(s.stageId)
         stageIdToTasksComplete.remove(s.stageId)
         stageIdToTasksFailed.remove(s.stageId)
@@ -149,6 +151,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
         Option(taskEnd.taskMetrics).foreach { taskMetrics =>
           taskMetrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
           taskMetrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
+          y.bytesSpilled += taskMetrics.bytesSpilled
         }
       }
       case _ => {}
@@ -184,6 +187,10 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
     stageIdToShuffleWrite(sid) += shuffleWrite
     totalShuffleWrite += shuffleWrite
 
+    stageIdToBytesSpilled.getOrElseUpdate(sid, 0L)
+    val bytesSpilled = metrics.map(m => m.bytesSpilled).getOrElse(0L)
+    stageIdToBytesSpilled(sid) += bytesSpilled
+
     val taskList = stageIdToTaskInfos.getOrElse(
       sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
     taskList -= ((taskEnd.taskInfo, None, None))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 8dcfeac..bb10431 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -56,6 +56,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
       val hasShuffleRead = shuffleReadBytes > 0
       val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L)
       val hasShuffleWrite = shuffleWriteBytes > 0
+      val bytesSpilled = listener.stageIdToBytesSpilled.getOrElse(stageId, 0L)
+      val hasBytesSpilled = bytesSpilled > 0
 
       var activeTime = 0L
       listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now))
@@ -81,6 +83,12 @@ private[spark] class StagePage(parent: JobProgressUI) {
                 {Utils.bytesToString(shuffleWriteBytes)}
               </li>
             }
+            {if (hasBytesSpilled)
+            <li>
+              <strong>Bytes spilled: </strong>
+              {Utils.bytesToString(bytesSpilled)}
+            </li>
+            }
           </ul>
         </div>
 
@@ -89,9 +97,10 @@ private[spark] class StagePage(parent: JobProgressUI) {
         Seq("Duration", "GC Time", "Result Ser Time") ++
         {if (hasShuffleRead) Seq("Shuffle Read")  else Nil} ++
         {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
+        {if (hasBytesSpilled) Seq("Bytes Spilled") else Nil} ++
         Seq("Errors")
 
-      val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks)
+      val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
 
       // Excludes tasks which failed and have incomplete metrics
       val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined))
@@ -153,13 +162,20 @@ private[spark] class StagePage(parent: JobProgressUI) {
           }
           val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
 
+          val bytesSpilledSizes = validTasks.map {
+            case(info, metrics, exception) =>
+              metrics.get.bytesSpilled.toDouble
+          }
+          val bytesSpilledQuantiles = "Bytes Spilled" +: getQuantileCols(bytesSpilledSizes)
+
           val listings: Seq[Seq[String]] = Seq(
             serializationQuantiles,
             serviceQuantiles,
             gettingResultQuantiles,
             schedulerDelayQuantiles,
             if (hasShuffleRead) shuffleReadQuantiles else Nil,
-            if (hasShuffleWrite) shuffleWriteQuantiles else Nil)
+            if (hasShuffleWrite) shuffleWriteQuantiles else Nil,
+            if (hasBytesSpilled) bytesSpilledQuantiles else Nil)
 
           val quantileHeaders = Seq("Metric", "Min", "25th percentile",
             "Median", "75th percentile", "Max")
@@ -178,8 +194,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
     }
   }
 
-
-  def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean)
+  def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean)
              (taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = {
     def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
       trace.map(e => <span style="display:block;">{e.toString}</span>)
@@ -205,6 +220,10 @@ private[spark] class StagePage(parent: JobProgressUI) {
     val writeTimeReadable = maybeWriteTime.map{ t => t / (1000 * 1000)}.map{ ms =>
       if (ms == 0) "" else parent.formatDuration(ms)}.getOrElse("")
 
+    val maybeBytesSpilled = metrics.map{m => m.bytesSpilled}
+    val bytesSpilledSortable = maybeBytesSpilled.map(_.toString).getOrElse("")
+    val bytesSpilledReadable = maybeBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("")
+
     <tr>
       <td>{info.index}</td>
       <td>{info.taskId}</td>
@@ -234,6 +253,11 @@ private[spark] class StagePage(parent: JobProgressUI) {
            {shuffleWriteReadable}
          </td>
       }}
+      {if (bytesSpilled) {
+        <td sorttable_customkey={bytesSpilledSortable}>
+          {bytesSpilledReadable}
+        </td>
+      }}
       <td>{exception.map(e =>
         <span>
           {e.className} ({e.description})<br/>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index e3bcd89..0100083 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -77,7 +77,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
   }
 
   // Number of pairs in the in-memory map
-  private var numPairsInMemory = 0
+  private var numPairsInMemory = 0L
 
   // Number of in-memory pairs inserted before tracking the map's shuffle memory usage
   private val trackMemoryThreshold = 1000
@@ -85,6 +85,9 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
   // How many times we have spilled so far
   private var spillCount = 0
 
+  // Number of bytes spilled in total
+  private var bytesSpilled = 0L
+
   private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
   private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false)
   private val comparator = new KCComparator[K, C]
@@ -161,6 +164,14 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
       shuffleMemoryMap(Thread.currentThread().getId) = 0
     }
     numPairsInMemory = 0
+    bytesSpilled += mapSize
+  }
+
+  /**
+   * Register the total number of bytes spilled by this task
+   */
+  def registerBytesSpilled(taskId: Long) {
+    SparkEnv.get.bytesSpilledMap(taskId) = bytesSpilled
   }
 
   /**