You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:53 UTC

[36/50] git commit: Simplify ExternalAppendOnlyMap on the assumption that the mergeCombiners function is specified

Simplify ExternalAppendOnlyMap on the assumption that the mergeCombiners function is specified


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/92c304fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/92c304fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/92c304fd

Branch: refs/heads/master
Commit: 92c304fd0321d77941f0b029dc7b7f61804d8bca
Parents: 3bc9e39
Author: Andrew Or <an...@gmail.com>
Authored: Wed Jan 1 11:42:33 2014 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Wed Jan 1 11:42:33 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Aggregator.scala     |   3 +-
 .../util/collection/ExternalAppendOnlyMap.scala | 162 ++++++-------------
 .../collection/ExternalAppendOnlyMapSuite.scala |  23 ---
 3 files changed, 53 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/92c304fd/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 784c09e..c408d5f 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -71,8 +71,7 @@ case class Aggregator[K, V, C: ClassTag] (
       }
       combiners.iterator
     } else {
-      val combiners =
-        new ExternalAppendOnlyMap[K, C, C](Predef.identity, mergeCombiners, mergeCombiners)
+      val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
       while (iter.hasNext) {
         val kc = iter.next()
         combiners.insert(kc._1, kc._2)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/92c304fd/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 223fae1..9e147fe 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -23,98 +23,40 @@ import java.util.Comparator
 import it.unimi.dsi.fastutil.io.FastBufferedInputStream
 
 import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
-import scala.reflect.ClassTag
 
 import org.apache.spark.{Logging, SparkEnv}
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter}
 
 /**
- * A wrapper for SpillableAppendOnlyMap that handles two cases:
+ * An append-only map that spills sorted content to disk when the memory threshold is exceeded.
  *
- * (1)  If a mergeCombiners function is specified, merge values into combiners before disk
- *      spill, as it is possible to merge the resulting combiners later.
+ * This map takes two passes over the data:
+ *   (1) Values are merged into combiners, which are sorted and spilled to disk in as necessary.
+ *   (2) Combiners are read from disk and merged together
  *
- * (2)  Otherwise, group values of the same key together before disk spill, and merge them
- *      into combiners only after reading them back from disk.
+ * Two parameters control the memory threshold: `spark.shuffle.buffer.mb` specifies the maximum
+ * size of the in-memory map before a spill, and `spark.shuffle.buffer.fraction` specifies an
+ * additional margin of safety. The second parameter is important for the following reason:
  *
- * In the latter case, values occupy much more space because they are not collapsed as soon
- * as they are inserted. This in turn leads to more disk spills, degrading performance.
- * For this reason, a mergeCombiners function should be specified if possible.
+ * If the spill threshold is set too high, the in-memory map may occupy more memory than is
+ * available, resulting in OOM. However, if the spill threshold is set too low, we spill
+ * frequently and incur unnecessary disk writes. This may lead to a performance regression
+ * compared to the normal case of using the non-spilling AppendOnlyMap.
  */
-private[spark] class ExternalAppendOnlyMap[K, V, C: ClassTag](
+
+private[spark] class ExternalAppendOnlyMap[K, V, C](
     createCombiner: V => C,
     mergeValue: (C, V) => C,
     mergeCombiners: (C, C) => C,
     serializer: Serializer = SparkEnv.get.serializerManager.default,
     diskBlockManager: DiskBlockManager = SparkEnv.get.blockManager.diskBlockManager)
-  extends Iterable[(K, C)] with Serializable {
-
-  private val mergeBeforeSpill: Boolean = mergeCombiners != null
-
-  private val map: SpillableAppendOnlyMap[K, V, _, C] = {
-    if (mergeBeforeSpill) {
-      new SpillableAppendOnlyMap[K, V, C, C] (createCombiner, mergeValue, mergeCombiners,
-        identity, serializer, diskBlockManager)
-    } else {
-      // Use ArrayBuffer[V] as the intermediate combiner
-      val createGroup: (V => ArrayBuffer[V]) = value => ArrayBuffer[V](value)
-      val mergeValueIntoGroup: (ArrayBuffer[V], V) => ArrayBuffer[V] = (group, value) => {
-        group += value
-      }
-      val mergeGroups: (ArrayBuffer[V], ArrayBuffer[V]) => ArrayBuffer[V] = (group1, group2) => {
-        group1 ++= group2
-      }
-      val combineGroup: (ArrayBuffer[V] => C) = group => {
-        var combiner : Option[C] = None
-        group.foreach { v =>
-          combiner match {
-            case None => combiner = Some(createCombiner(v))
-            case Some(c) => combiner = Some(mergeValue(c, v))
-          }
-        }
-        combiner.getOrElse(null.asInstanceOf[C])
-      }
-      new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C](createGroup, mergeValueIntoGroup,
-        mergeGroups, combineGroup, serializer, diskBlockManager)
-    }
-  }
-
-  def insert(key: K, value: V): Unit = map.insert(key, value)
-
-  override def iterator: Iterator[(K, C)] = map.iterator
-}
-
-/**
- * An append-only map that spills sorted content to disk when the memory threshold is exceeded.
- * A group is an intermediate combiner, with type G equal to either C or ArrayBuffer[V].
- *
- * This map takes two passes over the data:
- *   (1) Values are merged into groups, which are spilled to disk as necessary.
- *   (2) Groups are read from disk and merged into combiners, which are returned.
- *
- * If we never spill to disk, we avoid the second pass provided that groups G are already
- * combiners C.
- *
- * Note that OOM is still possible with the SpillableAppendOnlyMap. This may occur if the
- * collective G values do not fit into memory, or if the size estimation is not sufficiently
- * accurate. To account for the latter, `spark.shuffle.buffer.fraction` specifies an additional
- * margin of safety, while `spark.shuffle.buffer.mb` specifies the raw memory threshold.
- */
-private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
-    createGroup: V => G,
-    mergeValue: (G, V) => G,
-    mergeGroups: (G, G) => G,
-    createCombiner: G => C,
-    serializer: Serializer,
-    diskBlockManager: DiskBlockManager)
   extends Iterable[(K, C)] with Serializable with Logging {
 
-  import SpillableAppendOnlyMap._
+  import ExternalAppendOnlyMap._
 
-  private var currentMap = new SizeTrackingAppendOnlyMap[K, G]
+  private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
   private val spilledMaps = new ArrayBuffer[DiskIterator]
-
   private val memoryThresholdMB = {
     // TODO: Turn this into a fraction of memory per reducer
     val bufferSize = System.getProperty("spark.shuffle.buffer.mb", "1024").toLong
@@ -123,13 +65,13 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
   }
   private val fileBufferSize =
     System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
-  private val comparator = new KeyGroupComparator[K, G]
+  private val comparator = new KCComparator[K, C]
   private val ser = serializer.newInstance()
   private var spillCount = 0
 
   def insert(key: K, value: V): Unit = {
-    val update: (Boolean, G) => G = (hadVal, oldVal) => {
-      if (hadVal) mergeValue(oldVal, value) else createGroup(value)
+    val update: (Boolean, C) => C = (hadVal, oldVal) => {
+      if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
     }
     currentMap.changeValue(key, update)
     if (currentMap.estimateSize() > memoryThresholdMB * 1024 * 1024) {
@@ -154,19 +96,19 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
       // Partial failures cannot be tolerated; do not revert partial writes
       writer.close()
     }
-    currentMap = new SizeTrackingAppendOnlyMap[K, G]
+    currentMap = new SizeTrackingAppendOnlyMap[K, C]
     spilledMaps.append(new DiskIterator(file))
   }
 
   override def iterator: Iterator[(K, C)] = {
-    if (spilledMaps.isEmpty && implicitly[ClassTag[G]] == implicitly[ClassTag[C]]) {
-      currentMap.iterator.asInstanceOf[Iterator[(K, C)]]
+    if (spilledMaps.isEmpty) {
+      currentMap.iterator
     } else {
       new ExternalIterator()
     }
   }
 
-  /** An iterator that sort-merges (K, G) pairs from memory and disk into (K, C) pairs. */
+  /** An iterator that sort-merges (K, C) pairs from the in-memory and on-disk maps */
   private class ExternalIterator extends Iterator[(K, C)] {
 
     // A fixed-size queue that maintains a buffer for each stream we are currently merging
@@ -177,43 +119,43 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
     val inputStreams = Seq(currentMap.destructiveSortedIterator(comparator)) ++ spilledMaps
 
     inputStreams.foreach{ it =>
-      val kgPairs = getMorePairs(it)
-      mergeHeap.enqueue(StreamBuffer(it, kgPairs))
+      val kcPairs = getMorePairs(it)
+      mergeHeap.enqueue(StreamBuffer(it, kcPairs))
     }
 
     /**
      * Fetch from the given iterator until a key of different hash is retrieved. In the
      * event of key hash collisions, this ensures no pairs are hidden from being merged.
      */
-    def getMorePairs(it: Iterator[(K, G)]): ArrayBuffer[(K, G)] = {
-      val kgPairs = new ArrayBuffer[(K, G)]
+    def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
+      val kcPairs = new ArrayBuffer[(K, C)]
       if (it.hasNext) {
-        var kg = it.next()
-        kgPairs += kg
-        val minHash = kg._1.hashCode()
-        while (it.hasNext && kg._1.hashCode() == minHash) {
-          kg = it.next()
-          kgPairs += kg
+        var kc = it.next()
+        kcPairs += kc
+        val minHash = kc._1.hashCode()
+        while (it.hasNext && kc._1.hashCode() == minHash) {
+          kc = it.next()
+          kcPairs += kc
         }
       }
-      kgPairs
+      kcPairs
     }
 
     /**
      * If the given buffer contains a value for the given key, merge that value into
-     * baseGroup and remove the corresponding (K, G) pair from the buffer
+     * baseCombiner and remove the corresponding (K, C) pair from the buffer
      */
-    def mergeIfKeyExists(key: K, baseGroup: G, buffer: StreamBuffer): G = {
+    def mergeIfKeyExists(key: K, baseCombiner: C, buffer: StreamBuffer): C = {
       var i = 0
       while (i < buffer.pairs.size) {
-        val (k, g) = buffer.pairs(i)
+        val (k, c) = buffer.pairs(i)
         if (k == key) {
           buffer.pairs.remove(i)
-          return mergeGroups(baseGroup, g)
+          return mergeCombiners(baseCombiner, c)
         }
         i += 1
       }
-      baseGroup
+      baseCombiner
     }
 
     override def hasNext: Boolean = {
@@ -233,7 +175,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
         // Should only happen when no other stream buffers have any pairs left
         throw new NoSuchElementException
       }
-      var (minKey, minGroup) = minPairs.remove(0)
+      var (minKey, minCombiner) = minPairs.remove(0)
       assert(minKey.hashCode() == minHash)
 
       // For all other streams that may have this key (i.e. have the same minimum key hash),
@@ -241,7 +183,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
       val mergedBuffers = ArrayBuffer[StreamBuffer](minBuffer)
       while (!mergeHeap.isEmpty && mergeHeap.head.minKeyHash == minHash) {
         val newBuffer = mergeHeap.dequeue()
-        minGroup = mergeIfKeyExists(minKey, minGroup, newBuffer)
+        minCombiner = mergeIfKeyExists(minKey, minCombiner, newBuffer)
         mergedBuffers += newBuffer
       }
 
@@ -253,7 +195,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
         mergeHeap.enqueue(buffer)
       }
 
-      (minKey, createCombiner(minGroup))
+      (minKey, minCombiner)
     }
 
     /**
@@ -263,7 +205,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
      *
      * StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
      */
-    case class StreamBuffer(iterator: Iterator[(K, G)], pairs: ArrayBuffer[(K, G)])
+    case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
       extends Comparable[StreamBuffer] {
 
       def minKeyHash: Int = {
@@ -282,18 +224,18 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
     }
   }
 
-  // Iterate through (K, G) pairs in sorted order from an on-disk map
-  private class DiskIterator(file: File) extends Iterator[(K, G)] {
+  // Iterate through (K, C) pairs in sorted order from an on-disk map
+  private class DiskIterator(file: File) extends Iterator[(K, C)] {
     val fileStream = new FileInputStream(file)
     val bufferedStream = new FastBufferedInputStream(fileStream)
     val deserializeStream = ser.deserializeStream(bufferedStream)
-    var nextItem: Option[(K, G)] = None
+    var nextItem: Option[(K, C)] = None
     var eof = false
 
-    def readNextItem(): Option[(K, G)] = {
+    def readNextItem(): Option[(K, C)] = {
       if (!eof) {
         try {
-          return Some(deserializeStream.readObject().asInstanceOf[(K, G)])
+          return Some(deserializeStream.readObject().asInstanceOf[(K, C)])
         } catch {
           case e: EOFException =>
             eof = true
@@ -312,7 +254,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
       }
     }
 
-    override def next(): (K, G) = {
+    override def next(): (K, C) = {
       nextItem match {
         case Some(item) =>
           nextItem = None
@@ -331,10 +273,10 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
   }
 }
 
-private[spark] object SpillableAppendOnlyMap {
-  private class KeyGroupComparator[K, G] extends Comparator[(K, G)] {
-    def compare(kg1: (K, G), kg2: (K, G)): Int = {
-      kg1._1.hashCode().compareTo(kg2._1.hashCode())
+private[spark] object ExternalAppendOnlyMap {
+  private class KCComparator[K, C] extends Comparator[(K, C)] {
+    def compare(kc1: (K, C), kc2: (K, C)): Int = {
+      kc1._1.hashCode().compareTo(kc2._1.hashCode())
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/92c304fd/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index baf94b4..a18d466 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -229,27 +229,4 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
       }
     }
   }
-
-  test("spilling with no mergeCombiners function") {
-    System.setProperty("spark.shuffle.buffer.mb", "1")
-    System.setProperty("spark.shuffle.buffer.fraction", "0.05")
-
-    // combineByKey - should spill exactly 11 times
-    val _createCombiner: Int => ArrayBuffer[Int] = i => ArrayBuffer[Int](i)
-    val _mergeValue: (ArrayBuffer[Int], Int) => ArrayBuffer[Int] = (buf, i) => buf += i
-    val rdd = sc.parallelize(0 until 10000).map(i => (i/4, i))
-    val result = rdd.combineByKey[ArrayBuffer[Int]](_createCombiner, _mergeValue, null,
-      new HashPartitioner(1), mapSideCombine=false).collect()
-
-    // result should be the same as groupByKey
-    assert(result.length == 2500)
-    result.foreach { case(i, seq) =>
-      i match {
-        case 0 => assert(seq.toSet == Set[Int](0, 1, 2, 3))
-        case 1250 => assert(seq.toSet == Set[Int](5000, 5001, 5002, 5003))
-        case 2499 => assert(seq.toSet == Set[Int](9996, 9997, 9998, 9999))
-        case _ =>
-      }
-    }
-  }
 }