You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/14 07:32:47 UTC
[5/7] git commit: Report bytes spilled for both memory and disk on
Web UI
Report bytes spilled for both memory and disk on Web UI
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a1f0992f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a1f0992f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a1f0992f
Branch: refs/heads/master
Commit: a1f0992faefbe042a9cb7a11842a817c958e4797
Parents: 69c9aeb
Author: Andrew Or <an...@gmail.com>
Authored: Sun Jan 12 23:42:57 2014 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Sun Jan 12 23:42:57 2014 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/Aggregator.scala | 9 ++--
.../org/apache/spark/executor/TaskMetrics.scala | 9 +++-
.../org/apache/spark/rdd/CoGroupedRDD.scala | 3 +-
.../apache/spark/ui/jobs/ExecutorSummary.scala | 3 +-
.../apache/spark/ui/jobs/ExecutorTable.scala | 6 ++-
.../spark/ui/jobs/JobProgressListener.scala | 19 +++++---
.../org/apache/spark/ui/jobs/StagePage.scala | 49 ++++++++++++++------
.../util/collection/ExternalAppendOnlyMap.scala | 12 ++---
8 files changed, 75 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a1f0992f/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 5fd90d0..cda3a95 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -34,7 +34,8 @@ case class Aggregator[K, V, C] (
private val sparkConf = SparkEnv.get.conf
private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
- def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext) : Iterator[(K, C)] = {
+ def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
+ context: TaskContext) : Iterator[(K, C)] = {
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kv: Product2[K, V] = null
@@ -52,7 +53,8 @@ case class Aggregator[K, V, C] (
val (k, v) = iter.next()
combiners.insert(k, v)
}
- context.taskMetrics.bytesSpilled = combiners.bytesSpilled
+ context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled
+ context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled
combiners.iterator
}
}
@@ -75,7 +77,8 @@ case class Aggregator[K, V, C] (
val (k, c) = iter.next()
combiners.insert(k, c)
}
- context.taskMetrics.bytesSpilled = combiners.bytesSpilled
+ context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled
+ context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled
combiners.iterator
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a1f0992f/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 44a1554..0c8f466 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -49,9 +49,14 @@ class TaskMetrics extends Serializable {
var resultSerializationTime: Long = _
/**
- * The number of bytes spilled to disk by this task
+ * The number of in-memory bytes spilled by this task
*/
- var bytesSpilled: Long = _
+ var memoryBytesSpilled: Long = _
+
+ /**
+ * The number of on-disk bytes spilled by this task
+ */
+ var diskBytesSpilled: Long = _
/**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a1f0992f/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 656c3ef..9c6b308 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -151,7 +151,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
map.insert(kv._1, new CoGroupValue(kv._2, depNum))
}
}
- context.taskMetrics.bytesSpilled = map.bytesSpilled
+ context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled
+ context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled
new InterruptibleIterator(context, map.iterator)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a1f0992f/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
index 3f9cc97..64e22a3 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
@@ -24,5 +24,6 @@ private[spark] class ExecutorSummary {
var succeededTasks : Int = 0
var shuffleRead : Long = 0
var shuffleWrite : Long = 0
- var bytesSpilled : Long = 0
+ var memoryBytesSpilled : Long = 0
+ var diskBytesSpilled : Long = 0
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a1f0992f/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 2522fba..7b73253 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -48,7 +48,8 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
<th>Succeeded Tasks</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>
- <th>Bytes Spilled</th>
+ <th>Bytes Spilled (Memory)</th>
+ <th>Bytes Spilled (Disk)</th>
</thead>
<tbody>
{createExecutorTable()}
@@ -81,7 +82,8 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
<td>{v.succeededTasks}</td>
<td>{Utils.bytesToString(v.shuffleRead)}</td>
<td>{Utils.bytesToString(v.shuffleWrite)}</td>
- <td>{Utils.bytesToString(v.bytesSpilled)}</td>
+ <td>{Utils.bytesToString(v.memoryBytesSpilled)}</td>
+ <td>{Utils.bytesToString(v.diskBytesSpilled)}</td>
</tr>
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a1f0992f/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index f2c8658..858a10c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -52,7 +52,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val stageIdToTime = HashMap[Int, Long]()
val stageIdToShuffleRead = HashMap[Int, Long]()
val stageIdToShuffleWrite = HashMap[Int, Long]()
- val stageIdToBytesSpilled = HashMap[Int, Long]()
+ val stageIdToMemoryBytesSpilled = HashMap[Int, Long]()
+ val stageIdToDiskBytesSpilled = HashMap[Int, Long]()
val stageIdToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
val stageIdToTasksComplete = HashMap[Int, Int]()
val stageIdToTasksFailed = HashMap[Int, Int]()
@@ -79,7 +80,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
stageIdToTime.remove(s.stageId)
stageIdToShuffleRead.remove(s.stageId)
stageIdToShuffleWrite.remove(s.stageId)
- stageIdToBytesSpilled.remove(s.stageId)
+ stageIdToMemoryBytesSpilled.remove(s.stageId)
+ stageIdToDiskBytesSpilled.remove(s.stageId)
stageIdToTasksActive.remove(s.stageId)
stageIdToTasksComplete.remove(s.stageId)
stageIdToTasksFailed.remove(s.stageId)
@@ -151,7 +153,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
Option(taskEnd.taskMetrics).foreach { taskMetrics =>
taskMetrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
taskMetrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
- y.bytesSpilled += taskMetrics.bytesSpilled
+ y.memoryBytesSpilled += taskMetrics.memoryBytesSpilled
+ y.diskBytesSpilled += taskMetrics.diskBytesSpilled
}
}
case _ => {}
@@ -187,9 +190,13 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
stageIdToShuffleWrite(sid) += shuffleWrite
totalShuffleWrite += shuffleWrite
- stageIdToBytesSpilled.getOrElseUpdate(sid, 0L)
- val bytesSpilled = metrics.map(m => m.bytesSpilled).getOrElse(0L)
- stageIdToBytesSpilled(sid) += bytesSpilled
+ stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L)
+ val memoryBytesSpilled = metrics.map(m => m.memoryBytesSpilled).getOrElse(0L)
+ stageIdToMemoryBytesSpilled(sid) += memoryBytesSpilled
+
+ stageIdToDiskBytesSpilled.getOrElseUpdate(sid, 0L)
+ val diskBytesSpilled = metrics.map(m => m.diskBytesSpilled).getOrElse(0L)
+ stageIdToDiskBytesSpilled(sid) += diskBytesSpilled
val taskList = stageIdToTaskInfos.getOrElse(
sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a1f0992f/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index bb10431..8f89fad 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -56,8 +56,9 @@ private[spark] class StagePage(parent: JobProgressUI) {
val hasShuffleRead = shuffleReadBytes > 0
val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L)
val hasShuffleWrite = shuffleWriteBytes > 0
- val bytesSpilled = listener.stageIdToBytesSpilled.getOrElse(stageId, 0L)
- val hasBytesSpilled = bytesSpilled > 0
+ val memoryBytesSpilled = listener.stageIdToMemoryBytesSpilled.getOrElse(stageId, 0L)
+ val diskBytesSpilled = listener.stageIdToDiskBytesSpilled.getOrElse(stageId, 0L)
+ val hasBytesSpilled = (memoryBytesSpilled > 0 && diskBytesSpilled > 0)
var activeTime = 0L
listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now))
@@ -85,8 +86,12 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
{if (hasBytesSpilled)
<li>
- <strong>Bytes spilled: </strong>
- {Utils.bytesToString(bytesSpilled)}
+ <strong>Bytes spilled (memory): </strong>
+ {Utils.bytesToString(memoryBytesSpilled)}
+ </li>
+ <li>
+ <strong>Bytes spilled (disk): </strong>
+ {Utils.bytesToString(diskBytesSpilled)}
</li>
}
</ul>
@@ -97,7 +102,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
Seq("Duration", "GC Time", "Result Ser Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
- {if (hasBytesSpilled) Seq("Bytes Spilled") else Nil} ++
+ {if (hasBytesSpilled) Seq("Bytes Spilled (Memory)", "Bytes Spilled (Disk)") else Nil} ++
Seq("Errors")
val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
@@ -162,11 +167,19 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
- val bytesSpilledSizes = validTasks.map {
+ val memoryBytesSpilledSizes = validTasks.map {
case(info, metrics, exception) =>
- metrics.get.bytesSpilled.toDouble
+ metrics.get.memoryBytesSpilled.toDouble
}
- val bytesSpilledQuantiles = "Bytes Spilled" +: getQuantileCols(bytesSpilledSizes)
+ val memoryBytesSpilledQuantiles = "Bytes spilled (memory)" +:
+ getQuantileCols(memoryBytesSpilledSizes)
+
+ val diskBytesSpilledSizes = validTasks.map {
+ case(info, metrics, exception) =>
+ metrics.get.diskBytesSpilled.toDouble
+ }
+ val diskBytesSpilledQuantiles = "Bytes spilled (disk)" +:
+ getQuantileCols(diskBytesSpilledSizes)
val listings: Seq[Seq[String]] = Seq(
serializationQuantiles,
@@ -175,7 +188,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
schedulerDelayQuantiles,
if (hasShuffleRead) shuffleReadQuantiles else Nil,
if (hasShuffleWrite) shuffleWriteQuantiles else Nil,
- if (hasBytesSpilled) bytesSpilledQuantiles else Nil)
+ if (hasBytesSpilled) memoryBytesSpilledQuantiles else Nil,
+ if (hasBytesSpilled) diskBytesSpilledQuantiles else Nil)
val quantileHeaders = Seq("Metric", "Min", "25th percentile",
"Median", "75th percentile", "Max")
@@ -220,9 +234,13 @@ private[spark] class StagePage(parent: JobProgressUI) {
val writeTimeReadable = maybeWriteTime.map{ t => t / (1000 * 1000)}.map{ ms =>
if (ms == 0) "" else parent.formatDuration(ms)}.getOrElse("")
- val maybeBytesSpilled = metrics.map{m => m.bytesSpilled}
- val bytesSpilledSortable = maybeBytesSpilled.map(_.toString).getOrElse("")
- val bytesSpilledReadable = maybeBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("")
+ val maybeMemoryBytesSpilled = metrics.map{m => m.memoryBytesSpilled}
+ val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("")
+ val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("")
+
+ val maybeDiskBytesSpilled = metrics.map{m => m.diskBytesSpilled}
+ val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("")
+ val diskBytesSpilledReadable = maybeDiskBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("")
<tr>
<td>{info.index}</td>
@@ -254,8 +272,11 @@ private[spark] class StagePage(parent: JobProgressUI) {
</td>
}}
{if (bytesSpilled) {
- <td sorttable_customkey={bytesSpilledSortable}>
- {bytesSpilledReadable}
+ <td sorttable_customkey={memoryBytesSpilledSortable}>
+ {memoryBytesSpilledReadable}
+ </td>
+ <td sorttable_customkey={diskBytesSpilledSortable}>
+ {diskBytesSpilledReadable}
</td>
}}
<td>{exception.map(e =>
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a1f0992f/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index f4e53c4..c63f47c 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -86,7 +86,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
private var spillCount = 0
// Number of bytes spilled in total
- private var _bytesSpilled = 0L
+ private var _memoryBytesSpilled = 0L
+ private var _diskBytesSpilled = 0L
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false)
@@ -153,6 +154,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
writer.commit()
} finally {
// Partial failures cannot be tolerated; do not revert partial writes
+ _diskBytesSpilled += writer.bytesWritten
writer.close()
}
currentMap = new SizeTrackingAppendOnlyMap[K, C]
@@ -164,13 +166,11 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
shuffleMemoryMap(Thread.currentThread().getId) = 0
}
numPairsInMemory = 0
- _bytesSpilled += mapSize
+ _memoryBytesSpilled += mapSize
}
- /**
- * Register the total number of bytes spilled by this task
- */
- def bytesSpilled: Long = _bytesSpilled
+ def memoryBytesSpilled: Long = _memoryBytesSpilled
+ def diskBytesSpilled: Long = _diskBytesSpilled
/**
* Return an iterator that merges the in-memory map with the spilled maps.