You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/14 07:32:43 UTC
[1/7] git commit: Induce spilling in ExternalAppendOnlyMapSuite
Updated Branches:
refs/heads/master 08b9fec93 -> 0ca0d4d65
Induce spilling in ExternalAppendOnlyMapSuite
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e6447152
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e6447152
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e6447152
Branch: refs/heads/master
Commit: e6447152b323a8fdf71ae3a8c1086ba6948e7512
Parents: 2e393cd
Author: Andrew Or <an...@gmail.com>
Authored: Fri Jan 10 18:33:48 2014 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Fri Jan 10 18:33:48 2014 -0800
----------------------------------------------------------------------
.../collection/ExternalAppendOnlyMapSuite.scala | 77 +++++++++++---------
1 file changed, 44 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e6447152/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index ef957bb..c3391f3 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -9,22 +9,19 @@ import org.apache.spark.SparkContext._
class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
- override def beforeEach() {
- val conf = new SparkConf(false)
- conf.set("spark.shuffle.externalSorting", "true")
- sc = new SparkContext("local", "test", conf)
- }
-
- val createCombiner: (Int => ArrayBuffer[Int]) = i => ArrayBuffer[Int](i)
- val mergeValue: (ArrayBuffer[Int], Int) => ArrayBuffer[Int] = (buffer, i) => {
+ private val createCombiner: (Int => ArrayBuffer[Int]) = i => ArrayBuffer[Int](i)
+ private val mergeValue: (ArrayBuffer[Int], Int) => ArrayBuffer[Int] = (buffer, i) => {
buffer += i
}
- val mergeCombiners: (ArrayBuffer[Int], ArrayBuffer[Int]) => ArrayBuffer[Int] =
+ private val mergeCombiners: (ArrayBuffer[Int], ArrayBuffer[Int]) => ArrayBuffer[Int] =
(buf1, buf2) => {
buf1 ++= buf2
}
test("simple insert") {
+ val conf = new SparkConf(false)
+ sc = new SparkContext("local", "test", conf)
+
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
mergeValue, mergeCombiners)
@@ -48,6 +45,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
}
test("insert with collision") {
+ val conf = new SparkConf(false)
+ sc = new SparkContext("local", "test", conf)
+
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
mergeValue, mergeCombiners)
@@ -67,6 +67,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
}
test("ordering") {
+ val conf = new SparkConf(false)
+ sc = new SparkContext("local", "test", conf)
+
val map1 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
mergeValue, mergeCombiners)
map1.insert(1, 10)
@@ -109,6 +112,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
}
test("null keys and values") {
+ val conf = new SparkConf(false)
+ sc = new SparkContext("local", "test", conf)
+
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
mergeValue, mergeCombiners)
map.insert(1, 5)
@@ -147,6 +153,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
}
test("simple aggregator") {
+ val conf = new SparkConf(false)
+ sc = new SparkContext("local", "test", conf)
+
// reduceByKey
val rdd = sc.parallelize(1 to 10).map(i => (i%2, 1))
val result1 = rdd.reduceByKey(_+_).collect()
@@ -159,6 +168,8 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
}
test("simple cogroup") {
+ val conf = new SparkConf(false)
+ sc = new SparkContext("local", "test", conf)
val rdd1 = sc.parallelize(1 to 4).map(i => (i, i))
val rdd2 = sc.parallelize(1 to 4).map(i => (i%2, i))
val result = rdd1.cogroup(rdd2).collect()
@@ -175,56 +186,56 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
}
test("spilling") {
- // TODO: Figure out correct memory parameters to actually induce spilling
- // System.setProperty("spark.shuffle.buffer.mb", "1")
- // System.setProperty("spark.shuffle.buffer.fraction", "0.05")
+ // TODO: Use SparkConf (which currently throws connection reset exception)
+ System.setProperty("spark.shuffle.memoryFraction", "0.001")
+ sc = new SparkContext("local-cluster[1,1,512]", "test")
- // reduceByKey - should spill exactly 6 times
- val rddA = sc.parallelize(0 until 10000).map(i => (i/2, i))
+ // reduceByKey - should spill ~8 times
+ val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i))
val resultA = rddA.reduceByKey(math.max(_, _)).collect()
- assert(resultA.length == 5000)
+ assert(resultA.length == 50000)
resultA.foreach { case(k, v) =>
k match {
case 0 => assert(v == 1)
- case 2500 => assert(v == 5001)
- case 4999 => assert(v == 9999)
+ case 25000 => assert(v == 50001)
+ case 49999 => assert(v == 99999)
case _ =>
}
}
- // groupByKey - should spill exactly 11 times
- val rddB = sc.parallelize(0 until 10000).map(i => (i/4, i))
+ // groupByKey - should spill ~17 times
+ val rddB = sc.parallelize(0 until 100000).map(i => (i/4, i))
val resultB = rddB.groupByKey().collect()
- assert(resultB.length == 2500)
+ assert(resultB.length == 25000)
resultB.foreach { case(i, seq) =>
i match {
case 0 => assert(seq.toSet == Set[Int](0, 1, 2, 3))
- case 1250 => assert(seq.toSet == Set[Int](5000, 5001, 5002, 5003))
- case 2499 => assert(seq.toSet == Set[Int](9996, 9997, 9998, 9999))
+ case 12500 => assert(seq.toSet == Set[Int](50000, 50001, 50002, 50003))
+ case 24999 => assert(seq.toSet == Set[Int](99996, 99997, 99998, 99999))
case _ =>
}
}
- // cogroup - should spill exactly 7 times
- val rddC1 = sc.parallelize(0 until 1000).map(i => (i, i))
- val rddC2 = sc.parallelize(0 until 1000).map(i => (i%100, i))
+ // cogroup - should spill ~7 times
+ val rddC1 = sc.parallelize(0 until 10000).map(i => (i, i))
+ val rddC2 = sc.parallelize(0 until 10000).map(i => (i%1000, i))
val resultC = rddC1.cogroup(rddC2).collect()
- assert(resultC.length == 1000)
+ assert(resultC.length == 10000)
resultC.foreach { case(i, (seq1, seq2)) =>
i match {
case 0 =>
assert(seq1.toSet == Set[Int](0))
- assert(seq2.toSet == Set[Int](0, 100, 200, 300, 400, 500, 600, 700, 800, 900))
- case 500 =>
- assert(seq1.toSet == Set[Int](500))
+ assert(seq2.toSet == Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000))
+ case 5000 =>
+ assert(seq1.toSet == Set[Int](5000))
assert(seq2.toSet == Set[Int]())
- case 999 =>
- assert(seq1.toSet == Set[Int](999))
+ case 9999 =>
+ assert(seq1.toSet == Set[Int](9999))
assert(seq2.toSet == Set[Int]())
case _ =>
}
}
- }
- // TODO: Test memory allocation for multiple concurrently running tasks
+ System.clearProperty("spark.shuffle.memoryFraction")
+ }
}
[3/7] git commit: Get rid of spill map in SparkEnv
Posted by pw...@apache.org.
Get rid of spill map in SparkEnv
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/8d40e722
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/8d40e722
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/8d40e722
Branch: refs/heads/master
Commit: 8d40e7222f2a0a421349621105dc4c69bd7f1bb8
Parents: bb8098f
Author: Andrew Or <an...@gmail.com>
Authored: Sun Jan 12 22:34:33 2014 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Sun Jan 12 22:34:33 2014 -0800
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/Aggregator.scala | 7 +++----
core/src/main/scala/org/apache/spark/SparkEnv.scala | 3 ---
core/src/main/scala/org/apache/spark/executor/Executor.scala | 2 --
core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala | 2 +-
.../apache/spark/util/collection/ExternalAppendOnlyMap.scala | 8 +++-----
5 files changed, 7 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8d40e722/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 0609b7b..c46b7bd 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -47,13 +47,12 @@ case class Aggregator[K, V, C] (
}
combiners.iterator
} else {
- val combiners =
- new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
+ val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) {
val (k, v) = iter.next()
combiners.insert(k, v)
}
- combiners.registerBytesSpilled(context.attemptId)
+ context.taskMetrics.bytesSpilled = combiners.bytesSpilled
combiners.iterator
}
}
@@ -76,7 +75,7 @@ case class Aggregator[K, V, C] (
val (k, c) = iter.next()
combiners.insert(k, c)
}
- combiners.registerBytesSpilled(context.attemptId)
+ context.taskMetrics.bytesSpilled = combiners.bytesSpilled
combiners.iterator
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8d40e722/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 6ec0750..08b592d 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -60,9 +60,6 @@ class SparkEnv private[spark] (
// All accesses should be manually synchronized
val shuffleMemoryMap = mutable.HashMap[Long, Long]()
- // A mapping of task ID to number of bytes spilled by that task. This is mainly for book-keeping.
- val bytesSpilledMap = mutable.HashMap[Long, Long]()
-
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
// A general, soft-reference map for metadata needed during HadoopRDD split computation
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8d40e722/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d5dae89..dd4aea0 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -229,7 +229,6 @@ private[spark] class Executor(
m.executorRunTime = (taskFinish - taskStart).toInt
m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
- m.bytesSpilled = env.bytesSpilledMap.get(taskId).getOrElse(0)
}
val accumUpdates = Accumulators.values
@@ -285,7 +284,6 @@ private[spark] class Executor(
shuffleMemoryMap.synchronized {
shuffleMemoryMap.remove(Thread.currentThread().getId)
}
- env.bytesSpilledMap.remove(taskId)
runningTasks.remove(taskId)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8d40e722/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 6df2b3a..34e8341 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -151,7 +151,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
map.insert(kv._1, new CoGroupValue(kv._2, depNum))
}
}
- map.registerBytesSpilled(context.attemptId)
+ context.taskMetrics.bytesSpilled = map.bytesSpilled
new InterruptibleIterator(context, map.iterator)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8d40e722/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 0100083..f4e53c4 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -86,7 +86,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
private var spillCount = 0
// Number of bytes spilled in total
- private var bytesSpilled = 0L
+ private var _bytesSpilled = 0L
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false)
@@ -164,15 +164,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
shuffleMemoryMap(Thread.currentThread().getId) = 0
}
numPairsInMemory = 0
- bytesSpilled += mapSize
+ _bytesSpilled += mapSize
}
/**
* Register the total number of bytes spilled by this task
*/
- def registerBytesSpilled(taskId: Long) {
- SparkEnv.get.bytesSpilledMap(taskId) = bytesSpilled
- }
+ def bytesSpilled: Long = _bytesSpilled
/**
* Return an iterator that merges the in-memory map with the spilled maps.
[7/7] git commit: Merge pull request #401 from andrewor14/master
Posted by pw...@apache.org.
Merge pull request #401 from andrewor14/master
External sorting - Add number of bytes spilled to Web UI
Additionally, update test suite for external sorting to induce spilling.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0ca0d4d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0ca0d4d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0ca0d4d6
Branch: refs/heads/master
Commit: 0ca0d4d657f4db519187cc27546104b38c38917f
Parents: 08b9fec 8399341
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 13 22:32:21 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 13 22:32:21 2014 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/Aggregator.scala | 12 ++-
.../org/apache/spark/executor/Executor.scala | 2 +-
.../org/apache/spark/executor/TaskMetrics.scala | 10 +++
.../org/apache/spark/rdd/CoGroupedRDD.scala | 3 +
.../org/apache/spark/rdd/PairRDDFunctions.scala | 10 ++-
.../apache/spark/ui/jobs/ExecutorSummary.scala | 2 +
.../apache/spark/ui/jobs/ExecutorTable.scala | 4 +
.../spark/ui/jobs/JobProgressListener.scala | 14 ++++
.../org/apache/spark/ui/jobs/StagePage.scala | 53 +++++++++++++-
.../util/collection/ExternalAppendOnlyMap.scala | 11 ++-
.../collection/ExternalAppendOnlyMapSuite.scala | 77 +++++++++++---------
11 files changed, 151 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0ca0d4d6/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0ca0d4d6/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
[6/7] git commit: Wording changes per Patrick
Posted by pw...@apache.org.
Wording changes per Patrick
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/83993414
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/83993414
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/83993414
Branch: refs/heads/master
Commit: 839934140f1a518acae8c60fe82c2253f911ea33
Parents: a1f0992
Author: Andrew Or <an...@gmail.com>
Authored: Mon Jan 13 20:51:38 2014 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Mon Jan 13 20:51:38 2014 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/ui/jobs/ExecutorTable.scala | 4 ++--
.../main/scala/org/apache/spark/ui/jobs/StagePage.scala | 10 +++++-----
2 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/83993414/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 7b73253..ab03eb5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -48,8 +48,8 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
<th>Succeeded Tasks</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>
- <th>Bytes Spilled (Memory)</th>
- <th>Bytes Spilled (Disk)</th>
+ <th>Shuffle Spill (Memory)</th>
+ <th>Shuffle Spill (Disk)</th>
</thead>
<tbody>
{createExecutorTable()}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/83993414/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 8f89fad..113f76b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -86,11 +86,11 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
{if (hasBytesSpilled)
<li>
- <strong>Bytes spilled (memory): </strong>
+ <strong>Shuffle spill (memory): </strong>
{Utils.bytesToString(memoryBytesSpilled)}
</li>
<li>
- <strong>Bytes spilled (disk): </strong>
+ <strong>Shuffle spill (disk): </strong>
{Utils.bytesToString(diskBytesSpilled)}
</li>
}
@@ -102,7 +102,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
Seq("Duration", "GC Time", "Result Ser Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
- {if (hasBytesSpilled) Seq("Bytes Spilled (Memory)", "Bytes Spilled (Disk)") else Nil} ++
+ {if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++
Seq("Errors")
val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
@@ -171,14 +171,14 @@ private[spark] class StagePage(parent: JobProgressUI) {
case(info, metrics, exception) =>
metrics.get.memoryBytesSpilled.toDouble
}
- val memoryBytesSpilledQuantiles = "Bytes spilled (memory)" +:
+ val memoryBytesSpilledQuantiles = "Shuffle spill (memory)" +:
getQuantileCols(memoryBytesSpilledSizes)
val diskBytesSpilledSizes = validTasks.map {
case(info, metrics, exception) =>
metrics.get.diskBytesSpilled.toDouble
}
- val diskBytesSpilledQuantiles = "Bytes spilled (disk)" +:
+ val diskBytesSpilledQuantiles = "Shuffle spill (disk)" +:
getQuantileCols(diskBytesSpilledSizes)
val listings: Seq[Seq[String]] = Seq(
[4/7] git commit: Enable external sorting by default
Posted by pw...@apache.org.
Enable external sorting by default
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/69c9aebe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/69c9aebe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/69c9aebe
Branch: refs/heads/master
Commit: 69c9aebed0dfd90e0a1c4d48cd24ea7ddd7624fa
Parents: 8d40e72
Author: Andrew Or <an...@gmail.com>
Authored: Sun Jan 12 22:43:01 2014 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Sun Jan 12 22:43:01 2014 -0800
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/Aggregator.scala | 2 +-
core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/69c9aebe/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index c46b7bd..5fd90d0 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -32,7 +32,7 @@ case class Aggregator[K, V, C] (
mergeCombiners: (C, C) => C) {
private val sparkConf = SparkEnv.get.conf
- private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", false)
+ private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext) : Iterator[(K, C)] = {
if (!externalSorting) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/69c9aebe/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 34e8341..656c3ef 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -107,7 +107,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
- val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", false)
+ val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size
[2/7] git commit: Add number of bytes spilled to Web UI
Posted by pw...@apache.org.
Add number of bytes spilled to Web UI
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/bb8098f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/bb8098f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/bb8098f2
Branch: refs/heads/master
Commit: bb8098f203e61111faddf2e1a04b03d62037e6c7
Parents: e644715
Author: Andrew Or <an...@gmail.com>
Authored: Fri Jan 10 21:40:55 2014 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Fri Jan 10 21:40:55 2014 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/Aggregator.scala | 8 +++--
.../main/scala/org/apache/spark/SparkEnv.scala | 3 ++
.../org/apache/spark/executor/Executor.scala | 4 ++-
.../org/apache/spark/executor/TaskMetrics.scala | 5 +++
.../org/apache/spark/rdd/CoGroupedRDD.scala | 4 ++-
.../org/apache/spark/rdd/PairRDDFunctions.scala | 10 +++---
.../apache/spark/ui/jobs/ExecutorSummary.scala | 1 +
.../apache/spark/ui/jobs/ExecutorTable.scala | 2 ++
.../spark/ui/jobs/JobProgressListener.scala | 7 +++++
.../org/apache/spark/ui/jobs/StagePage.scala | 32 +++++++++++++++++---
.../util/collection/ExternalAppendOnlyMap.scala | 13 +++++++-
11 files changed, 75 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 8b30cd4..0609b7b 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -32,9 +32,9 @@ case class Aggregator[K, V, C] (
mergeCombiners: (C, C) => C) {
private val sparkConf = SparkEnv.get.conf
- private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
+ private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", false)
- def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
+ def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext) : Iterator[(K, C)] = {
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kv: Product2[K, V] = null
@@ -53,11 +53,12 @@ case class Aggregator[K, V, C] (
val (k, v) = iter.next()
combiners.insert(k, v)
}
+ combiners.registerBytesSpilled(context.attemptId)
combiners.iterator
}
}
- def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
+ def combineCombinersByKey(iter: Iterator[(K, C)], context: TaskContext) : Iterator[(K, C)] = {
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kc: Product2[K, C] = null
@@ -75,6 +76,7 @@ case class Aggregator[K, V, C] (
val (k, c) = iter.next()
combiners.insert(k, c)
}
+ combiners.registerBytesSpilled(context.attemptId)
combiners.iterator
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 08b592d..6ec0750 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -60,6 +60,9 @@ class SparkEnv private[spark] (
// All accesses should be manually synchronized
val shuffleMemoryMap = mutable.HashMap[Long, Long]()
+ // A mapping of task ID to number of bytes spilled by that task. This is mainly for book-keeping.
+ val bytesSpilledMap = mutable.HashMap[Long, Long]()
+
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
// A general, soft-reference map for metadata needed during HadoopRDD split computation
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a7b2328..d5dae89 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -229,6 +229,7 @@ private[spark] class Executor(
m.executorRunTime = (taskFinish - taskStart).toInt
m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
+ m.bytesSpilled = env.bytesSpilledMap.get(taskId).getOrElse(0)
}
val accumUpdates = Accumulators.values
@@ -279,11 +280,12 @@ private[spark] class Executor(
//System.exit(1)
}
} finally {
- // TODO: Unregister shuffle memory only for ShuffleMapTask
+ // TODO: Unregister shuffle memory only for ResultTask
val shuffleMemoryMap = env.shuffleMemoryMap
shuffleMemoryMap.synchronized {
shuffleMemoryMap.remove(Thread.currentThread().getId)
}
+ env.bytesSpilledMap.remove(taskId)
runningTasks.remove(taskId)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index bb1471d..44a1554 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -49,6 +49,11 @@ class TaskMetrics extends Serializable {
var resultSerializationTime: Long = _
/**
+ * The number of bytes spilled to disk by this task
+ */
+ var bytesSpilled: Long = _
+
+ /**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
*/
var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index a73714a..6df2b3a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -106,7 +106,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
override val partitioner = Some(part)
override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
- val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
+
+ val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", false)
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size
@@ -150,6 +151,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
map.insert(kv._1, new CoGroupValue(kv._2, depNum))
}
}
+ map.registerBytesSpilled(context.attemptId)
new InterruptibleIterator(context, map.iterator)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 1248409..dd25d0c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -88,20 +88,22 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner == Some(partitioner)) {
self.mapPartitionsWithContext((context, iter) => {
- new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
+ new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else if (mapSideCombine) {
- val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
+ val combined = self.mapPartitionsWithContext((context, iter) => {
+ aggregator.combineValuesByKey(iter, context)
+ }, preservesPartitioning = true)
val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
.setSerializer(serializerClass)
partitioned.mapPartitionsWithContext((context, iter) => {
- new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter))
+ new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
}, preservesPartitioning = true)
} else {
// Don't apply map-side combiner.
val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
values.mapPartitionsWithContext((context, iter) => {
- new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
+ new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
index 3c53e88..3f9cc97 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
@@ -24,4 +24,5 @@ private[spark] class ExecutorSummary {
var succeededTasks : Int = 0
var shuffleRead : Long = 0
var shuffleWrite : Long = 0
+ var bytesSpilled : Long = 0
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 0dd8764..2522fba 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -48,6 +48,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
<th>Succeeded Tasks</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>
+ <th>Bytes Spilled</th>
</thead>
<tbody>
{createExecutorTable()}
@@ -80,6 +81,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
<td>{v.succeededTasks}</td>
<td>{Utils.bytesToString(v.shuffleRead)}</td>
<td>{Utils.bytesToString(v.shuffleWrite)}</td>
+ <td>{Utils.bytesToString(v.bytesSpilled)}</td>
</tr>
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index bcd2824..f2c8658 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -52,6 +52,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val stageIdToTime = HashMap[Int, Long]()
val stageIdToShuffleRead = HashMap[Int, Long]()
val stageIdToShuffleWrite = HashMap[Int, Long]()
+ val stageIdToBytesSpilled = HashMap[Int, Long]()
val stageIdToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
val stageIdToTasksComplete = HashMap[Int, Int]()
val stageIdToTasksFailed = HashMap[Int, Int]()
@@ -78,6 +79,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
stageIdToTime.remove(s.stageId)
stageIdToShuffleRead.remove(s.stageId)
stageIdToShuffleWrite.remove(s.stageId)
+ stageIdToBytesSpilled.remove(s.stageId)
stageIdToTasksActive.remove(s.stageId)
stageIdToTasksComplete.remove(s.stageId)
stageIdToTasksFailed.remove(s.stageId)
@@ -149,6 +151,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
Option(taskEnd.taskMetrics).foreach { taskMetrics =>
taskMetrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
taskMetrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
+ y.bytesSpilled += taskMetrics.bytesSpilled
}
}
case _ => {}
@@ -184,6 +187,10 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
stageIdToShuffleWrite(sid) += shuffleWrite
totalShuffleWrite += shuffleWrite
+ stageIdToBytesSpilled.getOrElseUpdate(sid, 0L)
+ val bytesSpilled = metrics.map(m => m.bytesSpilled).getOrElse(0L)
+ stageIdToBytesSpilled(sid) += bytesSpilled
+
val taskList = stageIdToTaskInfos.getOrElse(
sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList -= ((taskEnd.taskInfo, None, None))
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 8dcfeac..bb10431 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -56,6 +56,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
val hasShuffleRead = shuffleReadBytes > 0
val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L)
val hasShuffleWrite = shuffleWriteBytes > 0
+ val bytesSpilled = listener.stageIdToBytesSpilled.getOrElse(stageId, 0L)
+ val hasBytesSpilled = bytesSpilled > 0
var activeTime = 0L
listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now))
@@ -81,6 +83,12 @@ private[spark] class StagePage(parent: JobProgressUI) {
{Utils.bytesToString(shuffleWriteBytes)}
</li>
}
+ {if (hasBytesSpilled)
+ <li>
+ <strong>Bytes spilled: </strong>
+ {Utils.bytesToString(bytesSpilled)}
+ </li>
+ }
</ul>
</div>
@@ -89,9 +97,10 @@ private[spark] class StagePage(parent: JobProgressUI) {
Seq("Duration", "GC Time", "Result Ser Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
+ {if (hasBytesSpilled) Seq("Bytes Spilled") else Nil} ++
Seq("Errors")
- val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks)
+ val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
// Excludes tasks which failed and have incomplete metrics
val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined))
@@ -153,13 +162,20 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
+ val bytesSpilledSizes = validTasks.map {
+ case(info, metrics, exception) =>
+ metrics.get.bytesSpilled.toDouble
+ }
+ val bytesSpilledQuantiles = "Bytes Spilled" +: getQuantileCols(bytesSpilledSizes)
+
val listings: Seq[Seq[String]] = Seq(
serializationQuantiles,
serviceQuantiles,
gettingResultQuantiles,
schedulerDelayQuantiles,
if (hasShuffleRead) shuffleReadQuantiles else Nil,
- if (hasShuffleWrite) shuffleWriteQuantiles else Nil)
+ if (hasShuffleWrite) shuffleWriteQuantiles else Nil,
+ if (hasBytesSpilled) bytesSpilledQuantiles else Nil)
val quantileHeaders = Seq("Metric", "Min", "25th percentile",
"Median", "75th percentile", "Max")
@@ -178,8 +194,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
}
-
- def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean)
+ def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean)
(taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = {
def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
trace.map(e => <span style="display:block;">{e.toString}</span>)
@@ -205,6 +220,10 @@ private[spark] class StagePage(parent: JobProgressUI) {
val writeTimeReadable = maybeWriteTime.map{ t => t / (1000 * 1000)}.map{ ms =>
if (ms == 0) "" else parent.formatDuration(ms)}.getOrElse("")
+ val maybeBytesSpilled = metrics.map{m => m.bytesSpilled}
+ val bytesSpilledSortable = maybeBytesSpilled.map(_.toString).getOrElse("")
+ val bytesSpilledReadable = maybeBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("")
+
<tr>
<td>{info.index}</td>
<td>{info.taskId}</td>
@@ -234,6 +253,11 @@ private[spark] class StagePage(parent: JobProgressUI) {
{shuffleWriteReadable}
</td>
}}
+ {if (bytesSpilled) {
+ <td sorttable_customkey={bytesSpilledSortable}>
+ {bytesSpilledReadable}
+ </td>
+ }}
<td>{exception.map(e =>
<span>
{e.className} ({e.description})<br/>
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bb8098f2/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index e3bcd89..0100083 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -77,7 +77,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
}
// Number of pairs in the in-memory map
- private var numPairsInMemory = 0
+ private var numPairsInMemory = 0L
// Number of in-memory pairs inserted before tracking the map's shuffle memory usage
private val trackMemoryThreshold = 1000
@@ -85,6 +85,9 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
// How many times we have spilled so far
private var spillCount = 0
+ // Number of bytes spilled in total
+ private var bytesSpilled = 0L
+
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false)
private val comparator = new KCComparator[K, C]
@@ -161,6 +164,14 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
shuffleMemoryMap(Thread.currentThread().getId) = 0
}
numPairsInMemory = 0
+ bytesSpilled += mapSize
+ }
+
+ /**
+ * Register the total number of bytes spilled by this task
+ */
+ def registerBytesSpilled(taskId: Long) {
+ SparkEnv.get.bytesSpilledMap(taskId) = bytesSpilled
}
/**
[5/7] git commit: Report bytes spilled for both memory and disk on
Web UI
Posted by pw...@apache.org.
Report bytes spilled for both memory and disk on Web UI
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a1f0992f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a1f0992f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a1f0992f
Branch: refs/heads/master
Commit: a1f0992faefbe042a9cb7a11842a817c958e4797
Parents: 69c9aeb
Author: Andrew Or <an...@gmail.com>
Authored: Sun Jan 12 23:42:57 2014 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Sun Jan 12 23:42:57 2014 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/Aggregator.scala | 9 ++--
.../org/apache/spark/executor/TaskMetrics.scala | 9 +++-
.../org/apache/spark/rdd/CoGroupedRDD.scala | 3 +-
.../apache/spark/ui/jobs/ExecutorSummary.scala | 3 +-
.../apache/spark/ui/jobs/ExecutorTable.scala | 6 ++-
.../spark/ui/jobs/JobProgressListener.scala | 19 +++++---
.../org/apache/spark/ui/jobs/StagePage.scala | 49 ++++++++++++++------
.../util/collection/ExternalAppendOnlyMap.scala | 12 ++---
8 files changed, 75 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a1f0992f/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 5fd90d0..cda3a95 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -34,7 +34,8 @@ case class Aggregator[K, V, C] (
private val sparkConf = SparkEnv.get.conf
private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
- def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext) : Iterator[(K, C)] = {
+ def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
+ context: TaskContext) : Iterator[(K, C)] = {
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kv: Product2[K, V] = null
@@ -52,7 +53,8 @@ case class Aggregator[K, V, C] (
val (k, v) = iter.next()
combiners.insert(k, v)
}
- context.taskMetrics.bytesSpilled = combiners.bytesSpilled
+ context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled
+ context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled
combiners.iterator
}
}
@@ -75,7 +77,8 @@ case class Aggregator[K, V, C] (
val (k, c) = iter.next()
combiners.insert(k, c)
}
- context.taskMetrics.bytesSpilled = combiners.bytesSpilled
+ context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled
+ context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled
combiners.iterator
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a1f0992f/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 44a1554..0c8f466 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -49,9 +49,14 @@ class TaskMetrics extends Serializable {
var resultSerializationTime: Long = _
/**
- * The number of bytes spilled to disk by this task
+ * The number of in-memory bytes spilled by this task
*/
- var bytesSpilled: Long = _
+ var memoryBytesSpilled: Long = _
+
+ /**
+ * The number of on-disk bytes spilled by this task
+ */
+ var diskBytesSpilled: Long = _
/**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a1f0992f/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 656c3ef..9c6b308 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -151,7 +151,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
map.insert(kv._1, new CoGroupValue(kv._2, depNum))
}
}
- context.taskMetrics.bytesSpilled = map.bytesSpilled
+ context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled
+ context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled
new InterruptibleIterator(context, map.iterator)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a1f0992f/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
index 3f9cc97..64e22a3 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
@@ -24,5 +24,6 @@ private[spark] class ExecutorSummary {
var succeededTasks : Int = 0
var shuffleRead : Long = 0
var shuffleWrite : Long = 0
- var bytesSpilled : Long = 0
+ var memoryBytesSpilled : Long = 0
+ var diskBytesSpilled : Long = 0
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a1f0992f/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 2522fba..7b73253 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -48,7 +48,8 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
<th>Succeeded Tasks</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>
- <th>Bytes Spilled</th>
+ <th>Bytes Spilled (Memory)</th>
+ <th>Bytes Spilled (Disk)</th>
</thead>
<tbody>
{createExecutorTable()}
@@ -81,7 +82,8 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
<td>{v.succeededTasks}</td>
<td>{Utils.bytesToString(v.shuffleRead)}</td>
<td>{Utils.bytesToString(v.shuffleWrite)}</td>
- <td>{Utils.bytesToString(v.bytesSpilled)}</td>
+ <td>{Utils.bytesToString(v.memoryBytesSpilled)}</td>
+ <td>{Utils.bytesToString(v.diskBytesSpilled)}</td>
</tr>
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a1f0992f/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index f2c8658..858a10c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -52,7 +52,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val stageIdToTime = HashMap[Int, Long]()
val stageIdToShuffleRead = HashMap[Int, Long]()
val stageIdToShuffleWrite = HashMap[Int, Long]()
- val stageIdToBytesSpilled = HashMap[Int, Long]()
+ val stageIdToMemoryBytesSpilled = HashMap[Int, Long]()
+ val stageIdToDiskBytesSpilled = HashMap[Int, Long]()
val stageIdToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
val stageIdToTasksComplete = HashMap[Int, Int]()
val stageIdToTasksFailed = HashMap[Int, Int]()
@@ -79,7 +80,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
stageIdToTime.remove(s.stageId)
stageIdToShuffleRead.remove(s.stageId)
stageIdToShuffleWrite.remove(s.stageId)
- stageIdToBytesSpilled.remove(s.stageId)
+ stageIdToMemoryBytesSpilled.remove(s.stageId)
+ stageIdToDiskBytesSpilled.remove(s.stageId)
stageIdToTasksActive.remove(s.stageId)
stageIdToTasksComplete.remove(s.stageId)
stageIdToTasksFailed.remove(s.stageId)
@@ -151,7 +153,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
Option(taskEnd.taskMetrics).foreach { taskMetrics =>
taskMetrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
taskMetrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
- y.bytesSpilled += taskMetrics.bytesSpilled
+ y.memoryBytesSpilled += taskMetrics.memoryBytesSpilled
+ y.diskBytesSpilled += taskMetrics.diskBytesSpilled
}
}
case _ => {}
@@ -187,9 +190,13 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
stageIdToShuffleWrite(sid) += shuffleWrite
totalShuffleWrite += shuffleWrite
- stageIdToBytesSpilled.getOrElseUpdate(sid, 0L)
- val bytesSpilled = metrics.map(m => m.bytesSpilled).getOrElse(0L)
- stageIdToBytesSpilled(sid) += bytesSpilled
+ stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L)
+ val memoryBytesSpilled = metrics.map(m => m.memoryBytesSpilled).getOrElse(0L)
+ stageIdToMemoryBytesSpilled(sid) += memoryBytesSpilled
+
+ stageIdToDiskBytesSpilled.getOrElseUpdate(sid, 0L)
+ val diskBytesSpilled = metrics.map(m => m.diskBytesSpilled).getOrElse(0L)
+ stageIdToDiskBytesSpilled(sid) += diskBytesSpilled
val taskList = stageIdToTaskInfos.getOrElse(
sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a1f0992f/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index bb10431..8f89fad 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -56,8 +56,9 @@ private[spark] class StagePage(parent: JobProgressUI) {
val hasShuffleRead = shuffleReadBytes > 0
val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L)
val hasShuffleWrite = shuffleWriteBytes > 0
- val bytesSpilled = listener.stageIdToBytesSpilled.getOrElse(stageId, 0L)
- val hasBytesSpilled = bytesSpilled > 0
+ val memoryBytesSpilled = listener.stageIdToMemoryBytesSpilled.getOrElse(stageId, 0L)
+ val diskBytesSpilled = listener.stageIdToDiskBytesSpilled.getOrElse(stageId, 0L)
+ val hasBytesSpilled = (memoryBytesSpilled > 0 && diskBytesSpilled > 0)
var activeTime = 0L
listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now))
@@ -85,8 +86,12 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
{if (hasBytesSpilled)
<li>
- <strong>Bytes spilled: </strong>
- {Utils.bytesToString(bytesSpilled)}
+ <strong>Bytes spilled (memory): </strong>
+ {Utils.bytesToString(memoryBytesSpilled)}
+ </li>
+ <li>
+ <strong>Bytes spilled (disk): </strong>
+ {Utils.bytesToString(diskBytesSpilled)}
</li>
}
</ul>
@@ -97,7 +102,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
Seq("Duration", "GC Time", "Result Ser Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
- {if (hasBytesSpilled) Seq("Bytes Spilled") else Nil} ++
+ {if (hasBytesSpilled) Seq("Bytes Spilled (Memory)", "Bytes Spilled (Disk)") else Nil} ++
Seq("Errors")
val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
@@ -162,11 +167,19 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
- val bytesSpilledSizes = validTasks.map {
+ val memoryBytesSpilledSizes = validTasks.map {
case(info, metrics, exception) =>
- metrics.get.bytesSpilled.toDouble
+ metrics.get.memoryBytesSpilled.toDouble
}
- val bytesSpilledQuantiles = "Bytes Spilled" +: getQuantileCols(bytesSpilledSizes)
+ val memoryBytesSpilledQuantiles = "Bytes spilled (memory)" +:
+ getQuantileCols(memoryBytesSpilledSizes)
+
+ val diskBytesSpilledSizes = validTasks.map {
+ case(info, metrics, exception) =>
+ metrics.get.diskBytesSpilled.toDouble
+ }
+ val diskBytesSpilledQuantiles = "Bytes spilled (disk)" +:
+ getQuantileCols(diskBytesSpilledSizes)
val listings: Seq[Seq[String]] = Seq(
serializationQuantiles,
@@ -175,7 +188,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
schedulerDelayQuantiles,
if (hasShuffleRead) shuffleReadQuantiles else Nil,
if (hasShuffleWrite) shuffleWriteQuantiles else Nil,
- if (hasBytesSpilled) bytesSpilledQuantiles else Nil)
+ if (hasBytesSpilled) memoryBytesSpilledQuantiles else Nil,
+ if (hasBytesSpilled) diskBytesSpilledQuantiles else Nil)
val quantileHeaders = Seq("Metric", "Min", "25th percentile",
"Median", "75th percentile", "Max")
@@ -220,9 +234,13 @@ private[spark] class StagePage(parent: JobProgressUI) {
val writeTimeReadable = maybeWriteTime.map{ t => t / (1000 * 1000)}.map{ ms =>
if (ms == 0) "" else parent.formatDuration(ms)}.getOrElse("")
- val maybeBytesSpilled = metrics.map{m => m.bytesSpilled}
- val bytesSpilledSortable = maybeBytesSpilled.map(_.toString).getOrElse("")
- val bytesSpilledReadable = maybeBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("")
+ val maybeMemoryBytesSpilled = metrics.map{m => m.memoryBytesSpilled}
+ val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("")
+ val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("")
+
+ val maybeDiskBytesSpilled = metrics.map{m => m.diskBytesSpilled}
+ val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("")
+ val diskBytesSpilledReadable = maybeDiskBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("")
<tr>
<td>{info.index}</td>
@@ -254,8 +272,11 @@ private[spark] class StagePage(parent: JobProgressUI) {
</td>
}}
{if (bytesSpilled) {
- <td sorttable_customkey={bytesSpilledSortable}>
- {bytesSpilledReadable}
+ <td sorttable_customkey={memoryBytesSpilledSortable}>
+ {memoryBytesSpilledReadable}
+ </td>
+ <td sorttable_customkey={diskBytesSpilledSortable}>
+ {diskBytesSpilledReadable}
</td>
}}
<td>{exception.map(e =>
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a1f0992f/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index f4e53c4..c63f47c 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -86,7 +86,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
private var spillCount = 0
// Number of bytes spilled in total
- private var _bytesSpilled = 0L
+ private var _memoryBytesSpilled = 0L
+ private var _diskBytesSpilled = 0L
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false)
@@ -153,6 +154,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
writer.commit()
} finally {
// Partial failures cannot be tolerated; do not revert partial writes
+ _diskBytesSpilled += writer.bytesWritten
writer.close()
}
currentMap = new SizeTrackingAppendOnlyMap[K, C]
@@ -164,13 +166,11 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
shuffleMemoryMap(Thread.currentThread().getId) = 0
}
numPairsInMemory = 0
- _bytesSpilled += mapSize
+ _memoryBytesSpilled += mapSize
}
- /**
- * Register the total number of bytes spilled by this task
- */
- def bytesSpilled: Long = _bytesSpilled
+ def memoryBytesSpilled: Long = _memoryBytesSpilled
+ def diskBytesSpilled: Long = _diskBytesSpilled
/**
* Return an iterator that merges the in-memory map with the spilled maps.