You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/14 07:32:45 UTC
[3/7] git commit: Get rid of spill map in SparkEnv
Get rid of spill map in SparkEnv
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/8d40e722
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/8d40e722
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/8d40e722
Branch: refs/heads/master
Commit: 8d40e7222f2a0a421349621105dc4c69bd7f1bb8
Parents: bb8098f
Author: Andrew Or <an...@gmail.com>
Authored: Sun Jan 12 22:34:33 2014 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Sun Jan 12 22:34:33 2014 -0800
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/Aggregator.scala | 7 +++----
core/src/main/scala/org/apache/spark/SparkEnv.scala | 3 ---
core/src/main/scala/org/apache/spark/executor/Executor.scala | 2 --
core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala | 2 +-
.../apache/spark/util/collection/ExternalAppendOnlyMap.scala | 8 +++-----
5 files changed, 7 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8d40e722/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 0609b7b..c46b7bd 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -47,13 +47,12 @@ case class Aggregator[K, V, C] (
}
combiners.iterator
} else {
- val combiners =
- new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
+ val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) {
val (k, v) = iter.next()
combiners.insert(k, v)
}
- combiners.registerBytesSpilled(context.attemptId)
+ context.taskMetrics.bytesSpilled = combiners.bytesSpilled
combiners.iterator
}
}
@@ -76,7 +75,7 @@ case class Aggregator[K, V, C] (
val (k, c) = iter.next()
combiners.insert(k, c)
}
- combiners.registerBytesSpilled(context.attemptId)
+ context.taskMetrics.bytesSpilled = combiners.bytesSpilled
combiners.iterator
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8d40e722/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 6ec0750..08b592d 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -60,9 +60,6 @@ class SparkEnv private[spark] (
// All accesses should be manually synchronized
val shuffleMemoryMap = mutable.HashMap[Long, Long]()
- // A mapping of task ID to number of bytes spilled by that task. This is mainly for book-keeping.
- val bytesSpilledMap = mutable.HashMap[Long, Long]()
-
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
// A general, soft-reference map for metadata needed during HadoopRDD split computation
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8d40e722/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d5dae89..dd4aea0 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -229,7 +229,6 @@ private[spark] class Executor(
m.executorRunTime = (taskFinish - taskStart).toInt
m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
- m.bytesSpilled = env.bytesSpilledMap.get(taskId).getOrElse(0)
}
val accumUpdates = Accumulators.values
@@ -285,7 +284,6 @@ private[spark] class Executor(
shuffleMemoryMap.synchronized {
shuffleMemoryMap.remove(Thread.currentThread().getId)
}
- env.bytesSpilledMap.remove(taskId)
runningTasks.remove(taskId)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8d40e722/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 6df2b3a..34e8341 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -151,7 +151,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
map.insert(kv._1, new CoGroupValue(kv._2, depNum))
}
}
- map.registerBytesSpilled(context.attemptId)
+ context.taskMetrics.bytesSpilled = map.bytesSpilled
new InterruptibleIterator(context, map.iterator)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8d40e722/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 0100083..f4e53c4 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -86,7 +86,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
private var spillCount = 0
// Number of bytes spilled in total
- private var bytesSpilled = 0L
+ private var _bytesSpilled = 0L
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false)
@@ -164,15 +164,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
shuffleMemoryMap(Thread.currentThread().getId) = 0
}
numPairsInMemory = 0
- bytesSpilled += mapSize
+ _bytesSpilled += mapSize
}
/**
* Register the total number of bytes spilled by this task
*/
- def registerBytesSpilled(taskId: Long) {
- SparkEnv.get.bytesSpilledMap(taskId) = bytesSpilled
- }
+ def bytesSpilled: Long = _bytesSpilled
/**
* Return an iterator that merges the in-memory map with the spilled maps.