You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:25 UTC
[08/50] git commit: Add toggle for ExternalAppendOnlyMap in
Aggregator and CoGroupedRDD
Add toggle for ExternalAppendOnlyMap in Aggregator and CoGroupedRDD
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2a2ca2a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2a2ca2a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2a2ca2a6
Branch: refs/heads/master
Commit: 2a2ca2a6610571bac45cdcedbf0c30927cea7c22
Parents: 28685a4
Author: Andrew Or <an...@gmail.com>
Authored: Wed Dec 25 17:32:01 2013 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Thu Dec 26 23:40:07 2013 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/Aggregator.scala | 47 +++++++++++++++-----
.../org/apache/spark/rdd/CoGroupedRDD.scala | 39 ++++++++++++----
.../spark/util/ExternalAppendOnlyMap.scala | 3 --
3 files changed, 65 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2a2ca2a6/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index c51fb1d..ecaeb2d 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -32,22 +32,45 @@ case class Aggregator[K, V, C] (
mergeCombiners: (C, C) => C) {
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
- //val combiners = new AppendOnlyMap[K, C]
- val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
- while (iter.hasNext) {
- val kv = iter.next()
- combiners.insert(kv._1, kv._2)
+ val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean
+ if (!externalSorting) {
+ val combiners = new AppendOnlyMap[K,C]
+ var kv: Product2[K, V] = null
+ val update = (hadValue: Boolean, oldValue: C) => {
+ if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
+ }
+ while (iter.hasNext) {
+ kv = iter.next()
+ combiners.changeValue(kv._1, update)
+ }
+ combiners.iterator
+ } else {
+ // Spilling
+ val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
+ iter.foreach { case(k, v) => combiners.insert(k, v) }
+ combiners.iterator
}
- combiners.iterator
}
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
- //val combiners = new AppendOnlyMap[K, C]
- val combiners = new ExternalAppendOnlyMap[K, C, C]((c:C) => c, mergeCombiners, mergeCombiners)
- while (iter.hasNext) {
- val kc = iter.next()
- combiners.insert(kc._1, kc._2)
+ val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean
+ if (!externalSorting) {
+ val combiners = new AppendOnlyMap[K,C]
+ var kc: Product2[K, C] = null
+ val update = (hadValue: Boolean, oldValue: C) => {
+ if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
+ }
+ while (iter.hasNext) {
+ kc = iter.next()
+ combiners.changeValue(kc._1, update)
+ }
+ combiners.iterator
+ } else {
+ // Spilling
+ def combinerIdentity(combiner:C) = combiner
+ val combiners = new ExternalAppendOnlyMap[K, C, C](combinerIdentity, mergeCombiners, mergeCombiners)
+ iter.foreach { case(k, c) => combiners.insert(k, c) }
+ combiners.iterator
}
- combiners.iterator
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2a2ca2a6/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index a2a3de7..a7265f3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -102,28 +102,49 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
// e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
+ val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size
- //val combiners = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]]
- val combiners = createExternalMap(numRdds)
-
val ser = SparkEnv.get.serializerManager.get(serializerClass)
+
+ // A list of (rdd iterator, dependency number) pairs
+ val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
// Read them from the parent
- rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach {
- kv => combiners.insert(kv._1, new CoGroupValue(kv._2, depNum))
- }
+ val v = (rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]], depNum)
+ rddIterators += v
}
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
val fetcher = SparkEnv.get.shuffleFetcher
- fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser).foreach {
- kv => combiners.insert(kv._1, new CoGroupValue(kv._2, depNum))
+ val v = (fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser), depNum)
+ rddIterators += v
+ }
+ }
+
+ if (!externalSorting) {
+ val map = new AppendOnlyMap[K, CoGroupCombiner]
+ val update: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = (hadVal, oldVal) => {
+ if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any])
+ }
+ val getSeq = (k: K) => map.changeValue(k, update)
+ rddIterators.foreach { case(iter, depNum) =>
+ iter.foreach {
+ case(k, v) => getSeq(k)(depNum) += v
+ }
+ }
+ new InterruptibleIterator(context, map.iterator)
+ } else {
+ // Spilling
+ val map = createExternalMap(numRdds)
+ rddIterators.foreach { case(iter, depNum) =>
+ iter.foreach {
+ case(k, v) => map.insert(k, new CoGroupValue(v, depNum))
}
}
+ new InterruptibleIterator(context, map.iterator)
}
- new InterruptibleIterator(context, combiners.iterator)
}
private def createExternalMap(numRdds:Int): ExternalAppendOnlyMap [K, CoGroupValue, CoGroupCombiner] = {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2a2ca2a6/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
index e2205c6..790dcf0 100644
--- a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
@@ -39,11 +39,9 @@ class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C,
private val map: SpillableAppendOnlyMap[K, V, _, C] = {
if (mergeBeforeSpill) {
- println("* Merge before spill *")
new SpillableAppendOnlyMap[K, V, C, C] (createCombiner,
mergeValue, mergeCombiners, combinerIdentity, memoryThresholdMB)
} else {
- println("* Merge after spill *")
new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C] (createGroup,
mergeValueIntoGroup, mergeGroups, combineGroup, memoryThresholdMB)
}
@@ -103,7 +101,6 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M,
}
def spill(): Unit = {
- println("> SPILL <")
val file = File.createTempFile("external_append_only_map", "") // Add spill location
val out = new ObjectOutputStream(new FileOutputStream(file))
val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())