You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:26 UTC

[09/50] git commit: Provide for cases when mergeCombiners is not specified in ExternalAppendOnlyMap

Provide for cases when mergeCombiners is not specified in ExternalAppendOnlyMap


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/28685a48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/28685a48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/28685a48

Branch: refs/heads/master
Commit: 28685a482032f4b42cb46d1b24de1cc1dd1180c1
Parents: 17def8c
Author: Andrew Or <an...@gmail.com>
Authored: Wed Dec 25 15:16:57 2013 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Thu Dec 26 23:40:07 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/rdd/CoGroupedRDD.scala     |   2 +-
 .../spark/util/ExternalAppendOnlyMap.scala      | 187 ++++++++++++-------
 2 files changed, 121 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/28685a48/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index b93c60c..a2a3de7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -126,7 +126,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
     new InterruptibleIterator(context, combiners.iterator)
   }
 
-  def createExternalMap(numRdds:Int): ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
+  private def createExternalMap(numRdds:Int): ExternalAppendOnlyMap [K, CoGroupValue, CoGroupCombiner] = {
     def createCombiner(v: CoGroupValue): CoGroupCombiner = {
       val newCombiner = Array.fill(numRdds)(new CoGroup)
       mergeValue(newCombiner, v)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/28685a48/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
index 1a57536..e2205c6 100644
--- a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
@@ -18,11 +18,16 @@
 package org.apache.spark.util
 
 import java.io._
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable
+import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
 
 /**
- * An append-only map that spills sorted content to disk when the memory threshold is exceeded.
+ * A wrapper for SpillableAppendOnlyMap that handles two cases:
+ *
+ * (1)  If a mergeCombiners function is specified, merge values into combiners before
+ *      disk spill, as it is possible to merge the resulting combiners later
+ *
+ * (2)  Otherwise, group values of the same key together before disk spill, and merge
+ *      them into combiners only after reading them back from disk
  */
 class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C,
                                       mergeValue: (C, V) => C,
@@ -30,78 +35,103 @@ class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C,
                                       memoryThresholdMB: Int = 1024)
   extends Iterable[(K, C)] with Serializable {
 
-  var currentMap = new AppendOnlyMap[K, C]
-  var oldMaps = new ArrayBuffer[DiskKCIterator]
+  private val mergeBeforeSpill: Boolean = mergeCombiners != null
+
+  private val map: SpillableAppendOnlyMap[K, V, _, C] = {
+    if (mergeBeforeSpill) {
+      println("* Merge before spill *")
+      new SpillableAppendOnlyMap[K, V, C, C] (createCombiner,
+        mergeValue, mergeCombiners, combinerIdentity, memoryThresholdMB)
+    } else {
+      println("* Merge after spill *")
+      new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C] (createGroup,
+        mergeValueIntoGroup, mergeGroups, combineGroup, memoryThresholdMB)
+    }
+  }
+
+  def insert(key: K, value: V): Unit = map.insert(key, value)
+
+  override def iterator: Iterator[(K, C)] = map.iterator
+
+  private def combinerIdentity(combiner: C): C = combiner
+  private def createGroup(value: V): ArrayBuffer[V] = ArrayBuffer[V](value)
+  private def mergeValueIntoGroup(group: ArrayBuffer[V], value: V): ArrayBuffer[V] = {
+    group += value
+    group
+  }
+  private def mergeGroups(group1: ArrayBuffer[V], group2: ArrayBuffer[V]): ArrayBuffer[V] = {
+    group1 ++= group2
+    group1
+  }
+  private def combineGroup(group: ArrayBuffer[V]): C = {
+    var combiner : Option[C] = None
+    group.foreach { v =>
+      combiner match {
+        case None => combiner = Some(createCombiner(v))
+        case Some(c) => combiner = Some(mergeValue(c, v))
+      }
+    }
+    combiner.get
+  }
+}
+
+/**
+ * An append-only map that spills sorted content to disk when the memory threshold
+ * is exceeded. A group with type M is an intermediate combiner, and shares the same
+ * type as either C or ArrayBuffer[V].
+ */
+class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M,
+                                          mergeValue: (M, V) => M,
+                                          mergeGroups: (M, M) => M,
+                                          createCombiner: M => C,
+                                          memoryThresholdMB: Int = 1024)
+  extends Iterable[(K, C)] with Serializable {
+
+  var currentMap = new AppendOnlyMap[K, M]
+  var oldMaps = new ArrayBuffer[DiskIterator]
 
   def insert(key: K, value: V): Unit = {
-    currentMap.changeValue(key, updateFunction(value))
+    def update(hadVal: Boolean, oldVal: M): M = {
+      if (hadVal) mergeValue(oldVal, value) else createGroup(value)
+    }
+    currentMap.changeValue(key, update)
     val mapSize = SizeEstimator.estimate(currentMap)
-    if (mapSize > memoryThresholdMB * math.pow(1024, 2)) {
+    //if (mapSize > memoryThresholdMB * math.pow(1024, 2)) {
+    if (mapSize > 1024 * 10) {
       spill()
     }
   }
 
-  def updateFunction(value: V) : (Boolean, C) => C = {
-    (hadVal: Boolean, oldVal: C) =>
-      if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
-  }
-
   def spill(): Unit = {
+    println("> SPILL <")
     val file = File.createTempFile("external_append_only_map", "")  // Add spill location
     val out = new ObjectOutputStream(new FileOutputStream(file))
     val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())
     sortedMap.foreach { out.writeObject( _ ) }
     out.close()
-    currentMap = new AppendOnlyMap[K, C]
-    oldMaps.append(new DiskKCIterator(file))
+    currentMap = new AppendOnlyMap[K, M]
+    oldMaps.append(new DiskIterator(file))
   }
 
   override def iterator: Iterator[(K, C)] = new ExternalIterator()
 
-  /**
-   *  An iterator that merges KV pairs from memory and disk in sorted order
-   */
+  // An iterator that sort-merges (K, M) pairs from memory and disk into (K, C) pairs
   class ExternalIterator extends Iterator[(K, C)] {
 
     // Order by increasing key hash value
-    implicit object KVOrdering extends Ordering[KCITuple] {
-      def compare(a:KCITuple, b:KCITuple) = -a.key.hashCode().compareTo(b.key.hashCode())
+    implicit object KVOrdering extends Ordering[KMITuple] {
+      def compare(a:KMITuple, b:KMITuple) = -a.key.hashCode().compareTo(b.key.hashCode())
     }
-    val pq = mutable.PriorityQueue[KCITuple]()
-    val inputStreams = Seq(new MemoryKCIterator(currentMap)) ++ oldMaps
+    val pq = PriorityQueue[KMITuple]()
+    val inputStreams = Seq(new MemoryIterator(currentMap)) ++ oldMaps
     inputStreams.foreach { readFromIterator( _ ) }
 
-    override def hasNext: Boolean = !pq.isEmpty
-
-    // Combine all values from all input streams corresponding to the same key
-    override def next(): (K, C) = {
-      val minKCI = pq.dequeue()
-      var (minKey, minCombiner) = (minKCI.key, minKCI.combiner)
-      val minHash = minKey.hashCode()
-      readFromIterator(minKCI.iter)
-
-      var collidedKCI = ArrayBuffer[KCITuple]()
-      while (!pq.isEmpty && pq.head.key.hashCode() == minHash) {
-        val newKCI: KCITuple = pq.dequeue()
-        if (newKCI.key == minKey){
-          minCombiner = mergeCombiners(minCombiner, newKCI.combiner)
-          readFromIterator(newKCI.iter)
-        } else {
-          // Collision
-          collidedKCI += newKCI
-        }
-      }
-      collidedKCI.foreach { pq.enqueue( _ ) }
-      (minKey, minCombiner)
-    }
-
-    // Read from the given iterator until a key of different hash is retrieved,
-    // Add each KC pair read from this iterator to the heap
-    def readFromIterator(iter: Iterator[(K, C)]): Unit = {
+    // Read from the given iterator until a key of different hash is retrieved
+    def readFromIterator(iter: Iterator[(K, M)]): Unit = {
       var minHash : Option[Int] = None
       while (iter.hasNext) {
-        val (k, c) = iter.next()
-        pq.enqueue(KCITuple(k, c, iter))
+        val (k, m) = iter.next()
+        pq.enqueue(KMITuple(k, m, iter))
         minHash match {
           case None => minHash = Some(k.hashCode())
           case Some(expectedHash) =>
@@ -112,40 +142,63 @@ class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C,
       }
     }
 
-    case class KCITuple(key:K, combiner:C, iter:Iterator[(K, C)])
+    override def hasNext: Boolean = !pq.isEmpty
+
+    override def next(): (K, C) = {
+      val minKMI = pq.dequeue()
+      var (minKey, minGroup) = (minKMI.key, minKMI.group)
+      val minHash = minKey.hashCode()
+      readFromIterator(minKMI.iterator)
+
+      // Merge groups with the same key into minGroup
+      var collidedKMI = ArrayBuffer[KMITuple]()
+      while (!pq.isEmpty && pq.head.key.hashCode() == minHash) {
+        val newKMI = pq.dequeue()
+        if (newKMI.key == minKey) {
+          minGroup = mergeGroups(minGroup, newKMI.group)
+          readFromIterator(newKMI.iterator)
+        } else {
+          // Collision
+          collidedKMI += newKMI
+        }
+      }
+      collidedKMI.foreach { pq.enqueue( _ ) }
+      (minKey, createCombiner(minGroup))
+    }
+
+    case class KMITuple(key:K, group:M, iterator:Iterator[(K, M)])
   }
 
-  class MemoryKCIterator(map: AppendOnlyMap[K, C]) extends Iterator[(K, C)] {
+  // Iterate through (K, M) pairs in sorted order from the in-memory map
+  class MemoryIterator(map: AppendOnlyMap[K, M]) extends Iterator[(K, M)] {
     val sortedMap = currentMap.iterator.toList.sortBy(kc => kc._1.hashCode())
     val it = sortedMap.iterator
     override def hasNext: Boolean = it.hasNext
-    override def next(): (K, C) = it.next()
+    override def next(): (K, M) = it.next()
   }
 
-  class DiskKCIterator(file: File) extends Iterator[(K, C)] {
+  // Iterate through (K, M) pairs in sorted order from an on-disk map
+  class DiskIterator(file: File) extends Iterator[(K, M)] {
     val in = new ObjectInputStream(new FileInputStream(file))
-    var nextItem:(K, C) = _
-    var eof = false
+    var nextItem: Option[(K, M)] = None
 
     override def hasNext: Boolean = {
-      if (eof) {
-        return false
-      }
       try {
-        nextItem = in.readObject().asInstanceOf[(K, C)]
+        nextItem = Some(in.readObject().asInstanceOf[(K, M)])
+        true
       } catch {
         case e: EOFException =>
-          eof = true
-          return false
+          nextItem = None
+          false
       }
-      true
     }
 
-    override def next(): (K, C) = {
-      if (eof) {
-        throw new NoSuchElementException
+    override def next(): (K, M) = {
+      nextItem match {
+        case Some(item) => item
+        case None =>
+          throw new NoSuchElementException
       }
-      nextItem
     }
   }
 }