You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:19 UTC

[02/50] git commit: Refactor ExternalAppendOnlyMap to take in KVC instead of just KV

Refactor ExternalAppendOnlyMap to take in KVC instead of just KV


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/17def8cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/17def8cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/17def8cc

Branch: refs/heads/master
Commit: 17def8cc1132a5c94d895dacba4217ef9a0e5bd0
Parents: 6a45ec1
Author: Andrew Or <an...@gmail.com>
Authored: Tue Dec 24 16:15:02 2013 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Thu Dec 26 23:40:07 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Aggregator.scala     | 20 ++---
 .../org/apache/spark/rdd/CoGroupedRDD.scala     | 46 +++++-----
 .../spark/util/ExternalAppendOnlyMap.scala      | 88 +++++++++++---------
 3 files changed, 78 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17def8cc/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 77a2473..c51fb1d 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -33,28 +33,20 @@ case class Aggregator[K, V, C] (
 
   def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
     //val combiners = new AppendOnlyMap[K, C]
-    val combiners = new ExternalAppendOnlyMap[K, C](mergeCombiners)
-    var kv: Product2[K, V] = null
-    val update = (hadValue: Boolean, oldValue: C) => {
-      if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
-    }
+    val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
     while (iter.hasNext) {
-      kv = iter.next()
-      combiners.changeValue(kv._1, update)
+      val kv = iter.next()
+      combiners.insert(kv._1, kv._2)
     }
     combiners.iterator
   }
 
   def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
     //val combiners = new AppendOnlyMap[K, C]
-    val combiners = new ExternalAppendOnlyMap[K, C](mergeCombiners)
-    var kc: (K, C) = null
-    val update = (hadValue: Boolean, oldValue: C) => {
-      if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
-    }
+    val combiners = new ExternalAppendOnlyMap[K, C, C]((c:C) => c, mergeCombiners, mergeCombiners)
     while (iter.hasNext) {
-      kc = iter.next()
-      combiners.changeValue(kc._1, update)
+      val kc = iter.next()
+      combiners.insert(kc._1, kc._2)
     }
     combiners.iterator
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17def8cc/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 4c45a94..b93c60c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -100,52 +100,56 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
 
   override val partitioner = Some(part)
 
-  override def compute(s: Partition, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
+  override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
     // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
     val split = s.asInstanceOf[CoGroupPartition]
     val numRdds = split.deps.size
-    def combine(x: Seq[ArrayBuffer[Any]], y: Seq[ArrayBuffer[Any]]) = {
-      x.zipAll(y, ArrayBuffer[Any](), ArrayBuffer[Any]()).map {
-        case (a, b) => a ++ b
-      }
-    }
-    //val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]]
-    val map = new ExternalAppendOnlyMap[K, Seq[ArrayBuffer[Any]]](combine)
+    //val combiners = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]]
+    val combiners = createExternalMap(numRdds)
 
     val ser = SparkEnv.get.serializerManager.get(serializerClass)
     for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
       case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
         // Read them from the parent
         rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach {
-          kv => addToMap(kv._1, kv._2, depNum)
+          kv => combiners.insert(kv._1, new CoGroupValue(kv._2, depNum))
         }
       }
       case ShuffleCoGroupSplitDep(shuffleId) => {
         // Read map outputs of shuffle
         val fetcher = SparkEnv.get.shuffleFetcher
         fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser).foreach {
-          kv => addToMap(kv._1, kv._2, depNum)
+          kv => combiners.insert(kv._1, new CoGroupValue(kv._2, depNum))
         }
       }
     }
+    new InterruptibleIterator(context, combiners.iterator)
+  }
 
-    def addToMap(key: K, value: Any, depNum: Int) {
-      def update(hadVal: Boolean, oldVal: Seq[ArrayBuffer[Any]]): Seq[ArrayBuffer[Any]] = {
-        var newVal = oldVal
-        if (!hadVal){
-          newVal = Array.fill(numRdds)(new ArrayBuffer[Any])
-        }
-        newVal(depNum) += value
-        newVal
+  def createExternalMap(numRdds:Int): ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
+    def createCombiner(v: CoGroupValue): CoGroupCombiner = {
+      val newCombiner = Array.fill(numRdds)(new CoGroup)
+      mergeValue(newCombiner, v)
+    }
+    def mergeValue(c: CoGroupCombiner, v: CoGroupValue): CoGroupCombiner = {
+      v match { case (value, depNum) => c(depNum) += value }
+      c
+    }
+    def mergeCombiners(c1: CoGroupCombiner, c2: CoGroupCombiner): CoGroupCombiner = {
+      c1.zipAll(c2, new CoGroup, new CoGroup).map {
+        case (v1, v2) => v1 ++ v2
       }
-      map.changeValue(key, update)
     }
-
-    new InterruptibleIterator(context, map.iterator)
+    new ExternalAppendOnlyMap [K, CoGroupValue, CoGroupCombiner] (
+      createCombiner,mergeValue, mergeCombiners)
   }
 
   override def clearDependencies() {
     super.clearDependencies()
     rdds = null
   }
+
+  type CoGroup = ArrayBuffer[Any]
+  type CoGroupValue = (Any, Int)  // Int is dependency number
+  type CoGroupCombiner = Seq[CoGroup]
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17def8cc/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
index 857f8e3..1a57536 100644
--- a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
@@ -22,80 +22,86 @@ import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable
 
 /**
- * A simple map that spills sorted content to disk when the memory threshold is exceeded. A combiner
- * function must be specified to merge values back into memory during read.
+ * An append-only map that spills sorted content to disk when the memory threshold is exceeded.
  */
-class ExternalAppendOnlyMap[K, V](combineFunction: (V, V) => V,
-                                 memoryThresholdMB: Int = 1024)
-  extends Iterable[(K, V)] with Serializable {
+class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C,
+                                      mergeValue: (C, V) => C,
+                                      mergeCombiners: (C, C) => C,
+                                      memoryThresholdMB: Int = 1024)
+  extends Iterable[(K, C)] with Serializable {
 
-  var currentMap = new AppendOnlyMap[K, V]
-  var oldMaps = new ArrayBuffer[DiskKVIterator]
+  var currentMap = new AppendOnlyMap[K, C]
+  var oldMaps = new ArrayBuffer[DiskKCIterator]
 
-  def changeValue(key: K, updateFunc: (Boolean, V) => V): Unit = {
-    currentMap.changeValue(key, updateFunc)
+  def insert(key: K, value: V): Unit = {
+    currentMap.changeValue(key, updateFunction(value))
     val mapSize = SizeEstimator.estimate(currentMap)
     if (mapSize > memoryThresholdMB * math.pow(1024, 2)) {
       spill()
     }
   }
 
+  def updateFunction(value: V) : (Boolean, C) => C = {
+    (hadVal: Boolean, oldVal: C) =>
+      if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
+  }
+
   def spill(): Unit = {
     val file = File.createTempFile("external_append_only_map", "")  // Add spill location
     val out = new ObjectOutputStream(new FileOutputStream(file))
     val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())
     sortedMap.foreach { out.writeObject( _ ) }
     out.close()
-    currentMap = new AppendOnlyMap[K, V]
-    oldMaps.append(new DiskKVIterator(file))
+    currentMap = new AppendOnlyMap[K, C]
+    oldMaps.append(new DiskKCIterator(file))
   }
 
-  override def iterator: Iterator[(K, V)] = new ExternalIterator()
+  override def iterator: Iterator[(K, C)] = new ExternalIterator()
 
   /**
    *  An iterator that merges KV pairs from memory and disk in sorted order
    */
-  class ExternalIterator extends Iterator[(K, V)] {
+  class ExternalIterator extends Iterator[(K, C)] {
 
     // Order by increasing key hash value
-    implicit object KVOrdering extends Ordering[KVITuple] {
-      def compare(a:KVITuple, b:KVITuple) = -a.key.hashCode().compareTo(b.key.hashCode())
+    implicit object KVOrdering extends Ordering[KCITuple] {
+      def compare(a:KCITuple, b:KCITuple) = -a.key.hashCode().compareTo(b.key.hashCode())
     }
-    val pq = mutable.PriorityQueue[KVITuple]()
-    val inputStreams = Seq(new MemoryKVIterator(currentMap)) ++ oldMaps
+    val pq = mutable.PriorityQueue[KCITuple]()
+    val inputStreams = Seq(new MemoryKCIterator(currentMap)) ++ oldMaps
     inputStreams.foreach { readFromIterator( _ ) }
 
     override def hasNext: Boolean = !pq.isEmpty
 
     // Combine all values from all input streams corresponding to the same key
-    override def next(): (K,V) = {
-      val minKVI = pq.dequeue()
-      var (minKey, minValue) = (minKVI.key, minKVI.value)
+    override def next(): (K, C) = {
+      val minKCI = pq.dequeue()
+      var (minKey, minCombiner) = (minKCI.key, minKCI.combiner)
       val minHash = minKey.hashCode()
-      readFromIterator(minKVI.iter)
+      readFromIterator(minKCI.iter)
 
-      var collidedKVI = ArrayBuffer[KVITuple]()
+      var collidedKCI = ArrayBuffer[KCITuple]()
       while (!pq.isEmpty && pq.head.key.hashCode() == minHash) {
-        val newKVI: KVITuple = pq.dequeue()
-        if (newKVI.key == minKey){
-          minValue = combineFunction(minValue, newKVI.value)
-          readFromIterator(newKVI.iter)
+        val newKCI: KCITuple = pq.dequeue()
+        if (newKCI.key == minKey){
+          minCombiner = mergeCombiners(minCombiner, newKCI.combiner)
+          readFromIterator(newKCI.iter)
         } else {
           // Collision
-          collidedKVI += newKVI
+          collidedKCI += newKCI
         }
       }
-      collidedKVI.foreach { pq.enqueue( _ ) }
-      (minKey, minValue)
+      collidedKCI.foreach { pq.enqueue( _ ) }
+      (minKey, minCombiner)
     }
 
     // Read from the given iterator until a key of different hash is retrieved,
-    // Add each KV pair read from this iterator to the heap
-    def readFromIterator(iter: Iterator[(K, V)]): Unit = {
+    // Add each KC pair read from this iterator to the heap
+    def readFromIterator(iter: Iterator[(K, C)]): Unit = {
       var minHash : Option[Int] = None
       while (iter.hasNext) {
-        val (k, v) = iter.next()
-        pq.enqueue(KVITuple(k, v, iter))
+        val (k, c) = iter.next()
+        pq.enqueue(KCITuple(k, c, iter))
         minHash match {
           case None => minHash = Some(k.hashCode())
           case Some(expectedHash) =>
@@ -106,19 +112,19 @@ class ExternalAppendOnlyMap[K, V](combineFunction: (V, V) => V,
       }
     }
 
-    case class KVITuple(key:K, value:V, iter:Iterator[(K, V)])
+    case class KCITuple(key:K, combiner:C, iter:Iterator[(K, C)])
   }
 
-  class MemoryKVIterator(map: AppendOnlyMap[K, V]) extends Iterator[(K, V)] {
-    val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())
+  class MemoryKCIterator(map: AppendOnlyMap[K, C]) extends Iterator[(K, C)] {
+    val sortedMap = currentMap.iterator.toList.sortBy(kc => kc._1.hashCode())
     val it = sortedMap.iterator
     override def hasNext: Boolean = it.hasNext
-    override def next(): (K, V) = it.next()
+    override def next(): (K, C) = it.next()
   }
 
-  class DiskKVIterator(file: File) extends Iterator[(K, V)] {
+  class DiskKCIterator(file: File) extends Iterator[(K, C)] {
     val in = new ObjectInputStream(new FileInputStream(file))
-    var nextItem:(K, V) = _
+    var nextItem:(K, C) = _
     var eof = false
 
     override def hasNext: Boolean = {
@@ -126,7 +132,7 @@ class ExternalAppendOnlyMap[K, V](combineFunction: (V, V) => V,
         return false
       }
       try {
-        nextItem = in.readObject().asInstanceOf[(K, V)]
+        nextItem = in.readObject().asInstanceOf[(K, C)]
       } catch {
         case e: EOFException =>
           eof = true
@@ -135,7 +141,7 @@ class ExternalAppendOnlyMap[K, V](combineFunction: (V, V) => V,
       true
     }
 
-    override def next(): (K, V) = {
+    override def next(): (K, C) = {
       if (eof) {
         throw new NoSuchElementException
       }