You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ad...@apache.org on 2014/07/27 20:20:32 UTC

git commit: SPARK-2684: Update ExternalAppendOnlyMap to take an iterator as input

Repository: spark
Updated Branches:
  refs/heads/master 3a69c72e5 -> 985705301


SPARK-2684: Update ExternalAppendOnlyMap to take an iterator as input

This will decrease object allocation from the "update" closure used in map.changeValue.

Author: Matei Zaharia <ma...@databricks.com>

Closes #1607 from mateiz/spark-2684 and squashes the following commits:

b7d89e6 [Matei Zaharia] Add insertAll for Iterables too, and fix some code style
561fc97 [Matei Zaharia] Update ExternalAppendOnlyMap to take an iterator as input


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98570530
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98570530
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98570530

Branch: refs/heads/master
Commit: 985705301e5e55de14b00ad8ce3143e91aae185d
Parents: 3a69c72
Author: Matei Zaharia <ma...@databricks.com>
Authored: Sun Jul 27 11:20:20 2014 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Sun Jul 27 11:20:20 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/Aggregator.scala     |  5 +-
 .../org/apache/spark/rdd/CoGroupedRDD.scala     |  7 +-
 .../util/collection/ExternalAppendOnlyMap.scala | 77 +++++++++++++-------
 .../collection/ExternalAppendOnlyMapSuite.scala | 17 +++--
 4 files changed, 64 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/98570530/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 1d64057..ff0ca11 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -55,10 +55,7 @@ case class Aggregator[K, V, C] (
       combiners.iterator
     } else {
       val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
-      while (iter.hasNext) {
-        val pair = iter.next()
-        combiners.insert(pair._1, pair._2)
-      }
+      combiners.insertAll(iter)
       // TODO: Make this non optional in a future release
       Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
       Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)

http://git-wip-us.apache.org/repos/asf/spark/blob/98570530/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 7d96089..6388ef8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -154,11 +154,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
         map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
     } else {
       val map = createExternalMap(numRdds)
-      rddIterators.foreach { case (it, depNum) =>
-        while (it.hasNext) {
-          val kv = it.next()
-          map.insert(kv._1, new CoGroupValue(kv._2, depNum))
-        }
+      for ((it, depNum) <- rddIterators) {
+        map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
       }
       context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled
       context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled

http://git-wip-us.apache.org/repos/asf/spark/blob/98570530/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index c22bb8d..6f263c3 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -110,42 +110,69 @@ class ExternalAppendOnlyMap[K, V, C](
 
   /**
    * Insert the given key and value into the map.
+   */
+  def insert(key: K, value: V): Unit = {
+    insertAll(Iterator((key, value)))
+  }
+
+  /**
+   * Insert the given iterator of keys and values into the map.
    *
-   * If the underlying map is about to grow, check if the global pool of shuffle memory has
+   * When the underlying map needs to grow, check if the global pool of shuffle memory has
    * enough room for this to happen. If so, allocate the memory required to grow the map;
    * otherwise, spill the in-memory map to disk.
    *
    * The shuffle memory usage of the first trackMemoryThreshold entries is not tracked.
    */
-  def insert(key: K, value: V) {
+  def insertAll(entries: Iterator[Product2[K, V]]): Unit = {
+    // An update function for the map that we reuse across entries to avoid allocating
+    // a new closure each time
+    var curEntry: Product2[K, V] = null
     val update: (Boolean, C) => C = (hadVal, oldVal) => {
-      if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
+      if (hadVal) mergeValue(oldVal, curEntry._2) else createCombiner(curEntry._2)
     }
-    if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) {
-      val mapSize = currentMap.estimateSize()
-      var shouldSpill = false
-      val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
-
-      // Atomically check whether there is sufficient memory in the global pool for
-      // this map to grow and, if possible, allocate the required amount
-      shuffleMemoryMap.synchronized {
-        val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
-        val availableMemory = maxMemoryThreshold -
-          (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L))
-
-        // Assume map growth factor is 2x
-        shouldSpill = availableMemory < mapSize * 2
-        if (!shouldSpill) {
-          shuffleMemoryMap(threadId) = mapSize * 2
+
+    while (entries.hasNext) {
+      curEntry = entries.next()
+      if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) {
+        val mapSize = currentMap.estimateSize()
+        var shouldSpill = false
+        val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
+
+        // Atomically check whether there is sufficient memory in the global pool for
+        // this map to grow and, if possible, allocate the required amount
+        shuffleMemoryMap.synchronized {
+          val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
+          val availableMemory = maxMemoryThreshold -
+            (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L))
+
+          // Assume map growth factor is 2x
+          shouldSpill = availableMemory < mapSize * 2
+          if (!shouldSpill) {
+            shuffleMemoryMap(threadId) = mapSize * 2
+          }
+        }
+        // Do not synchronize spills
+        if (shouldSpill) {
+          spill(mapSize)
         }
       }
-      // Do not synchronize spills
-      if (shouldSpill) {
-        spill(mapSize)
-      }
+      currentMap.changeValue(curEntry._1, update)
+      numPairsInMemory += 1
     }
-    currentMap.changeValue(key, update)
-    numPairsInMemory += 1
+  }
+
+  /**
+   * Insert the given iterable of keys and values into the map.
+   *
+   * When the underlying map needs to grow, check if the global pool of shuffle memory has
+   * enough room for this to happen. If so, allocate the memory required to grow the map;
+   * otherwise, spill the in-memory map to disk.
+   *
+   * The shuffle memory usage of the first trackMemoryThreshold entries is not tracked.
+   */
+  def insertAll(entries: Iterable[Product2[K, V]]): Unit = {
+    insertAll(entries.iterator)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/98570530/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 4288229..0b7ad18 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -63,12 +63,13 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
     val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
       mergeValue, mergeCombiners)
 
-    map.insert(1, 10)
-    map.insert(2, 20)
-    map.insert(3, 30)
-    map.insert(1, 100)
-    map.insert(2, 200)
-    map.insert(1, 1000)
+    map.insertAll(Seq(
+      (1, 10),
+      (2, 20),
+      (3, 30),
+      (1, 100),
+      (2, 200),
+      (1, 1000)))
     val it = map.iterator
     assert(it.hasNext)
     val result = it.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet))
@@ -282,7 +283,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
       assert(w1.hashCode === w2.hashCode)
     }
 
-    (1 to 100000).map(_.toString).foreach { i => map.insert(i, i) }
+    map.insertAll((1 to 100000).iterator.map(_.toString).map(i => (i, i)))
     collisionPairs.foreach { case (w1, w2) =>
       map.insert(w1, w2)
       map.insert(w2, w1)
@@ -355,7 +356,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
     val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](
       createCombiner, mergeValue, mergeCombiners)
 
-    (1 to 100000).foreach { i => map.insert(i, i) }
+    map.insertAll((1 to 100000).iterator.map(i => (i, i)))
     map.insert(null.asInstanceOf[Int], 1)
     map.insert(1, null.asInstanceOf[Int])
     map.insert(null.asInstanceOf[Int], null.asInstanceOf[Int])