You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:51 UTC

[34/50] git commit: Address Patrick's and Reynold's comments

Address Patrick's and Reynold's comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/83dfa166
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/83dfa166
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/83dfa166

Branch: refs/heads/master
Commit: 83dfa1666487a4772c95fea21fde0d47471e063d
Parents: 8bbe08b
Author: Andrew Or <an...@gmail.com>
Authored: Tue Dec 31 20:02:05 2013 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Tue Dec 31 20:02:05 2013 -0800

----------------------------------------------------------------------
 .../util/collection/ExternalAppendOnlyMap.scala | 120 +++++++++++--------
 1 file changed, 71 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/83dfa166/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 492b4fc..311405f 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -20,6 +20,8 @@ package org.apache.spark.util.collection
 import java.io._
 import java.util.Comparator
 
+import it.unimi.dsi.fastutil.io.FastBufferedInputStream
+
 import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
 import scala.reflect.ClassTag
 
@@ -53,7 +55,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C: ClassTag](
   private val map: SpillableAppendOnlyMap[K, V, _, C] = {
     if (mergeBeforeSpill) {
       new SpillableAppendOnlyMap[K, V, C, C] (createCombiner, mergeValue, mergeCombiners,
-        Predef.identity, serializer, diskBlockManager)
+        identity, serializer, diskBlockManager)
     } else {
       // Use ArrayBuffer[V] as the intermediate combiner
       val createGroup: (V => ArrayBuffer[V]) = value => ArrayBuffer[V](value)
@@ -111,9 +113,10 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
   import SpillableAppendOnlyMap._
 
   private var currentMap = new SizeTrackingAppendOnlyMap[K, G]
-  private val oldMaps = new ArrayBuffer[DiskKGIterator]
+  private val spilledMaps = new ArrayBuffer[DiskIterator]
 
   private val memoryThresholdMB = {
+    // TODO: Turn this into a fraction of memory per reducer
     val bufferSize = System.getProperty("spark.shuffle.buffer.mb", "1024").toLong
     val bufferPercent = System.getProperty("spark.shuffle.buffer.fraction", "0.8").toFloat
     bufferSize * bufferPercent
@@ -152,31 +155,37 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
       writer.close()
     }
     currentMap = new SizeTrackingAppendOnlyMap[K, G]
-    oldMaps.append(new DiskKGIterator(file))
+    spilledMaps.append(new DiskIterator(file))
   }
 
   override def iterator: Iterator[(K, C)] = {
-    if (oldMaps.isEmpty && implicitly[ClassTag[G]] == implicitly[ClassTag[C]]) {
+    if (spilledMaps.isEmpty && implicitly[ClassTag[G]] == implicitly[ClassTag[C]]) {
       currentMap.iterator.asInstanceOf[Iterator[(K, C)]]
     } else {
       new ExternalIterator()
     }
   }
 
-  // An iterator that sort-merges (K, G) pairs from memory and disk into (K, C) pairs
+  /** An iterator that sort-merges (K, G) pairs from memory and disk into (K, C) pairs. */
   private class ExternalIterator extends Iterator[(K, C)] {
-    val mergeHeap = new PriorityQueue[KGITuple]
-    val inputStreams = oldMaps ++ Seq(currentMap.destructiveSortedIterator(comparator))
 
-    // Invariant: size of mergeHeap == number of input streams
+    // A fixed-size queue that maintains a buffer for each stream we are currently merging
+    val mergeHeap = new PriorityQueue[StreamBuffer]
+
+    // Input streams are derived both from the in-memory map and spilled maps on disk
+    // The in-memory map is sorted in place, while the spilled maps are already in sorted order
+    val inputStreams = Seq(currentMap.destructiveSortedIterator(comparator)) ++ spilledMaps
+
     inputStreams.foreach{ it =>
-      val kgPairs = readFromIterator(it)
-      mergeHeap.enqueue(KGITuple(it, kgPairs))
+      val kgPairs = getMorePairs(it)
+      mergeHeap.enqueue(StreamBuffer(it, kgPairs))
     }
 
-    // Read from the given iterator until a key of different hash is retrieved.
-    // The resulting ArrayBuffer includes this key, and is ordered by key hash.
-    def readFromIterator(it: Iterator[(K, G)]): ArrayBuffer[(K, G)] = {
+    /**
+     * Fetch from the given iterator until a key of different hash is retrieved. In the
+     * event of key hash collisions, this ensures no pairs are hidden from being merged.
+     */
+    def getMorePairs(it: Iterator[(K, G)]): ArrayBuffer[(K, G)] = {
       val kgPairs = new ArrayBuffer[(K, G)]
       if (it.hasNext) {
         var kg = it.next()
@@ -190,20 +199,26 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
       kgPairs
     }
 
-    // From the given KGITuple, remove the (K, G) pair with K = key and merge it into baseGroup
-    def mergeIntoGroup(key: K, baseGroup: G, kgi: KGITuple): G = {
-      kgi.pairs.zipWithIndex.foreach { case ((k, g), i) =>
+    /**
+     * If the given buffer contains a value for the given key, merge that value into
+     * baseGroup and remove the corresponding (K, G) pair from the buffer
+     */
+    def mergeIfKeyExists(key: K, baseGroup: G, buffer: StreamBuffer): G = {
+      var i = 0
+      while (i < buffer.pairs.size) {
+        val (k, g) = buffer.pairs(i)
         if (k == key) {
-          kgi.pairs.remove(i)
+          buffer.pairs.remove(i)
           return mergeGroups(baseGroup, g)
         }
+        i += 1
       }
       baseGroup
     }
 
     override def hasNext: Boolean = {
-      mergeHeap.foreach{ kgi =>
-        if (!kgi.pairs.isEmpty) {
+      mergeHeap.foreach{ buffer =>
+        if (!buffer.pairs.isEmpty) {
           return true
         }
       }
@@ -211,66 +226,74 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
     }
 
     override def next(): (K, C) = {
-      val minKGI = mergeHeap.dequeue()
-      val (minPairs, minHash) = (minKGI.pairs, minKGI.minHash)
+      // Select a return key from the StreamBuffer that holds the lowest key hash
+      val minBuffer = mergeHeap.dequeue()
+      val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash)
       if (minPairs.length == 0) {
-        // Should only happen when hasNext is false
+        // Should only happen when no other stream buffers have any pairs left
         throw new NoSuchElementException
       }
-
-      // Select a return key with the minimum hash
       var (minKey, minGroup) = minPairs.remove(0)
       assert(minKey.hashCode() == minHash)
 
-      // Merge all other KGITuple's with the same minHash
-      val dequeuedKGI = ArrayBuffer[KGITuple](minKGI)
-      while (!mergeHeap.isEmpty && mergeHeap.head.minHash == minHash) {
-        val newKGI = mergeHeap.dequeue()
-        minGroup = mergeIntoGroup(minKey, minGroup, newKGI)
-        dequeuedKGI += newKGI
+      // For all other streams that may have this key (i.e. have the same minimum key hash),
+      // merge in the corresponding value (if any) from that stream
+      val mergedBuffers = ArrayBuffer[StreamBuffer](minBuffer)
+      while (!mergeHeap.isEmpty && mergeHeap.head.minKeyHash == minHash) {
+        val newBuffer = mergeHeap.dequeue()
+        minGroup = mergeIfKeyExists(minKey, minGroup, newBuffer)
+        mergedBuffers += newBuffer
       }
 
-      // Repopulate and add back all dequeued KGI to mergeHeap
-      dequeuedKGI.foreach { kgi =>
-        if (kgi.pairs.length == 0) {
-          kgi.pairs ++= readFromIterator(kgi.iterator)
+      // Repopulate each visited stream buffer and add it back to the merge heap
+      mergedBuffers.foreach { buffer =>
+        if (buffer.pairs.length == 0) {
+          buffer.pairs ++= getMorePairs(buffer.iterator)
         }
-        mergeHeap.enqueue(kgi)
+        mergeHeap.enqueue(buffer)
       }
 
       (minKey, createCombiner(minGroup))
     }
 
-    case class KGITuple(iterator: Iterator[(K, G)], pairs: ArrayBuffer[(K, G)])
-      extends Comparable[KGITuple] {
-
-      // Invariant: pairs are ordered by key hash
-      def minHash: Int = {
+    /**
+     * A buffer for streaming from a map iterator (in-memory or on-disk) sorted by key hash.
+     * Each buffer maintains the lowest-ordered keys in the corresponding iterator. Due to
+     * hash collisions, it is possible for multiple keys to be "tied" for being the lowest.
+     *
+     * StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
+     */
+    case class StreamBuffer(iterator: Iterator[(K, G)], pairs: ArrayBuffer[(K, G)])
+      extends Comparable[StreamBuffer] {
+
+      def minKeyHash: Int = {
         if (pairs.length > 0){
+          // pairs are already sorted by key hash
           pairs(0)._1.hashCode()
         } else {
           Int.MaxValue
         }
       }
 
-      override def compareTo(other: KGITuple): Int = {
-        // mutable.PriorityQueue dequeues the max, not the min
-        -minHash.compareTo(other.minHash)
+      override def compareTo(other: StreamBuffer): Int = {
+        // minus sign because mutable.PriorityQueue dequeues the max, not the min
+        -minKeyHash.compareTo(other.minKeyHash)
       }
     }
   }
 
   // Iterate through (K, G) pairs in sorted order from an on-disk map
-  private class DiskKGIterator(file: File) extends Iterator[(K, G)] {
-    val fstream = new FileInputStream(file)
-    val dstream = ser.deserializeStream(fstream)
+  private class DiskIterator(file: File) extends Iterator[(K, G)] {
+    val fileStream = new FileInputStream(file)
+    val bufferedStream = new FastBufferedInputStream(fileStream)
+    val deserializeStream = ser.deserializeStream(bufferedStream)
     var nextItem: Option[(K, G)] = None
     var eof = false
 
     def readNextItem(): Option[(K, G)] = {
       if (!eof) {
         try {
-          return Some(dstream.readObject().asInstanceOf[(K, G)])
+          return Some(deserializeStream.readObject().asInstanceOf[(K, G)])
         } catch {
           case e: EOFException =>
             eof = true
@@ -302,8 +325,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
 
     // TODO: Ensure this gets called even if the iterator isn't drained.
     def cleanup() {
-      fstream.close()
-      dstream.close()
+      deserializeStream.close()
       file.delete()
     }
   }