You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:25 UTC

[08/50] git commit: Add toggle for ExternalAppendOnlyMap in Aggregator and CoGroupedRDD

Add toggle for ExternalAppendOnlyMap in Aggregator and CoGroupedRDD


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2a2ca2a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2a2ca2a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2a2ca2a6

Branch: refs/heads/master
Commit: 2a2ca2a6610571bac45cdcedbf0c30927cea7c22
Parents: 28685a4
Author: Andrew Or <an...@gmail.com>
Authored: Wed Dec 25 17:32:01 2013 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Thu Dec 26 23:40:07 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Aggregator.scala     | 47 +++++++++++++++-----
 .../org/apache/spark/rdd/CoGroupedRDD.scala     | 39 ++++++++++++----
 .../spark/util/ExternalAppendOnlyMap.scala      |  3 --
 3 files changed, 65 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2a2ca2a6/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index c51fb1d..ecaeb2d 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -32,22 +32,45 @@ case class Aggregator[K, V, C] (
     mergeCombiners: (C, C) => C) {
 
   def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
-    //val combiners = new AppendOnlyMap[K, C]
-    val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
-    while (iter.hasNext) {
-      val kv = iter.next()
-      combiners.insert(kv._1, kv._2)
+    val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean
+    if (!externalSorting) {
+      val combiners = new AppendOnlyMap[K,C]
+      var kv: Product2[K, V] = null
+      val update = (hadValue: Boolean, oldValue: C) => {
+        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
+      }
+      while (iter.hasNext) {
+        kv = iter.next()
+        combiners.changeValue(kv._1, update)
+      }
+      combiners.iterator
+    } else {
+      // Spilling
+      val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
+      iter.foreach { case(k, v) => combiners.insert(k, v) }
+      combiners.iterator
     }
-    combiners.iterator
   }
 
   def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
-    //val combiners = new AppendOnlyMap[K, C]
-    val combiners = new ExternalAppendOnlyMap[K, C, C]((c:C) => c, mergeCombiners, mergeCombiners)
-    while (iter.hasNext) {
-      val kc = iter.next()
-      combiners.insert(kc._1, kc._2)
+    val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean
+    if (!externalSorting) {
+      val combiners = new AppendOnlyMap[K,C]
+      var kc: Product2[K, C] = null
+      val update = (hadValue: Boolean, oldValue: C) => {
+        if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
+      }
+      while (iter.hasNext) {
+        kc = iter.next()
+        combiners.changeValue(kc._1, update)
+      }
+      combiners.iterator
+    } else {
+      // Spilling
+      def combinerIdentity(combiner:C) = combiner
+      val combiners = new ExternalAppendOnlyMap[K, C, C](combinerIdentity, mergeCombiners, mergeCombiners)
+      iter.foreach { case(k, c) => combiners.insert(k, c) }
+      combiners.iterator
     }
-    combiners.iterator
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2a2ca2a6/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index a2a3de7..a7265f3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -102,28 +102,49 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
 
   override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
     // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
+    val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean
     val split = s.asInstanceOf[CoGroupPartition]
     val numRdds = split.deps.size
-    //val combiners = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]]
-    val combiners = createExternalMap(numRdds)
-
     val ser = SparkEnv.get.serializerManager.get(serializerClass)
+
+    // A list of (rdd iterator, dependency number) pairs
+    val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
     for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
       case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
         // Read them from the parent
-        rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach {
-          kv => combiners.insert(kv._1, new CoGroupValue(kv._2, depNum))
-        }
+        val v = (rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]], depNum)
+        rddIterators += v
       }
       case ShuffleCoGroupSplitDep(shuffleId) => {
         // Read map outputs of shuffle
         val fetcher = SparkEnv.get.shuffleFetcher
-        fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser).foreach {
-          kv => combiners.insert(kv._1, new CoGroupValue(kv._2, depNum))
+        val v = (fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser), depNum)
+        rddIterators += v
+      }
+    }
+
+    if (!externalSorting) {
+      val map = new AppendOnlyMap[K, CoGroupCombiner]
+      val update: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = (hadVal, oldVal) => {
+        if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any])
+      }
+      val getSeq = (k: K) => map.changeValue(k, update)
+      rddIterators.foreach { case(iter, depNum) =>
+        iter.foreach {
+          case(k, v) => getSeq(k)(depNum) += v
+        }
+      }
+      new InterruptibleIterator(context, map.iterator)
+    } else {
+      // Spilling
+      val map = createExternalMap(numRdds)
+      rddIterators.foreach { case(iter, depNum) =>
+        iter.foreach {
+          case(k, v) => map.insert(k, new CoGroupValue(v, depNum))
         }
       }
+      new InterruptibleIterator(context, map.iterator)
     }
-    new InterruptibleIterator(context, combiners.iterator)
   }
 
   private def createExternalMap(numRdds:Int): ExternalAppendOnlyMap [K, CoGroupValue, CoGroupCombiner] = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2a2ca2a6/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
index e2205c6..790dcf0 100644
--- a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
@@ -39,11 +39,9 @@ class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C,
 
   private val map: SpillableAppendOnlyMap[K, V, _, C] = {
     if (mergeBeforeSpill) {
-      println("* Merge before spill *")
       new SpillableAppendOnlyMap[K, V, C, C] (createCombiner,
         mergeValue, mergeCombiners, combinerIdentity, memoryThresholdMB)
     } else {
-      println("* Merge after spill *")
       new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C] (createGroup,
         mergeValueIntoGroup, mergeGroups, combineGroup, memoryThresholdMB)
     }
@@ -103,7 +101,6 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M,
   }
 
   def spill(): Unit = {
-    println("> SPILL <")
     val file = File.createTempFile("external_append_only_map", "")  // Add spill location
     val out = new ObjectOutputStream(new FileOutputStream(file))
     val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())