You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:26:05 UTC

[48/50] git commit: Address Patrick's and Reynold's comments

Address Patrick's and Reynold's comments

Aside from trivial formatting changes, use nulls instead of Options for
DiskMapIterator, and add documentation for spark.shuffle.externalSorting
and spark.shuffle.memoryFraction.

Also, set spark.shuffle.memoryFraction to 0.3, and spark.storage.memoryFraction = 0.6.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e4c51d21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e4c51d21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e4c51d21

Branch: refs/heads/master
Commit: e4c51d21135978908f7f4a46683f70ef98b720ec
Parents: 372a533
Author: Andrew Or <an...@gmail.com>
Authored: Fri Jan 10 15:09:51 2014 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Fri Jan 10 15:09:51 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Aggregator.scala     |  2 +-
 .../org/apache/spark/rdd/CoGroupedRDD.scala     |  3 +-
 .../org/apache/spark/storage/BlockManager.scala |  2 +-
 .../util/collection/ExternalAppendOnlyMap.scala | 89 +++++++++++---------
 docs/configuration.md                           | 24 +++++-
 5 files changed, 73 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e4c51d21/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 08a96b0..8b30cd4 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -32,7 +32,7 @@ case class Aggregator[K, V, C] (
     mergeCombiners: (C, C) => C) {
 
   private val sparkConf = SparkEnv.get.conf
-  private val externalSorting = sparkConf.get("spark.shuffle.externalSorting", "false").toBoolean
+  private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
 
   def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
     if (!externalSorting) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e4c51d21/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index b7c7773..a73714a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -106,8 +106,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
   override val partitioner = Some(part)
 
   override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
-
-    val externalSorting = sparkConf.get("spark.shuffle.externalSorting", "false").toBoolean
+    val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
     val split = s.asInstanceOf[CoGroupPartition]
     val numRdds = split.deps.size
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e4c51d21/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index c56e2ca..56cae6f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -864,7 +864,7 @@ private[spark] object BlockManager extends Logging {
   val ID_GENERATOR = new IdGenerator
 
   def getMaxMemory(conf: SparkConf): Long = {
-    val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.66)
+    val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
     (Runtime.getRuntime.maxMemory * memoryFraction).toLong
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e4c51d21/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 50f0535..e3bcd89 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -71,21 +71,24 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
 
   // Collective memory threshold shared across all running tasks
   private val maxMemoryThreshold = {
-    val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.75)
+    val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.3)
     val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8)
     (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
   }
 
-  // How many inserts into this map before tracking its shuffle memory usage
-  private val initialInsertThreshold =
-    sparkConf.getLong("spark.shuffle.initialInsertThreshold", 1000)
+  // Number of pairs in the in-memory map
+  private var numPairsInMemory = 0
+
+  // Number of in-memory pairs inserted before tracking the map's shuffle memory usage
+  private val trackMemoryThreshold = 1000
+
+  // How many times we have spilled so far
+  private var spillCount = 0
 
   private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
-  private val syncWrites = sparkConf.get("spark.shuffle.sync", "false").toBoolean
+  private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false)
   private val comparator = new KCComparator[K, C]
   private val ser = serializer.newInstance()
-  private var insertCount = 0
-  private var spillCount = 0
 
   /**
    * Insert the given key and value into the map.
@@ -94,14 +97,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
    * enough room for this to happen. If so, allocate the memory required to grow the map;
    * otherwise, spill the in-memory map to disk.
    *
-   * The shuffle memory usage of the first initialInsertThreshold entries is not tracked.
+   * The shuffle memory usage of the first trackMemoryThreshold entries is not tracked.
    */
   def insert(key: K, value: V) {
-    insertCount += 1
     val update: (Boolean, C) => C = (hadVal, oldVal) => {
       if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
     }
-    if (insertCount > initialInsertThreshold && currentMap.atGrowThreshold) {
+    if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) {
       val mapSize = currentMap.estimateSize()
       var shouldSpill = false
       val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
@@ -114,7 +116,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
         val availableMemory = maxMemoryThreshold -
           (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L))
 
-        // Assume map grow factor is 2x
+        // Assume map growth factor is 2x
         shouldSpill = availableMemory < mapSize * 2
         if (!shouldSpill) {
           shuffleMemoryMap(threadId) = mapSize * 2
@@ -126,6 +128,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
       }
     }
     currentMap.changeValue(key, update)
+    numPairsInMemory += 1
   }
 
   /**
@@ -133,7 +136,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
    */
   private def spill(mapSize: Long) {
     spillCount += 1
-    logWarning("* Spilling in-memory map of %d MB to disk (%d time%s so far)"
+    logWarning("Spilling in-memory map of %d MB to disk (%d time%s so far)"
       .format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
     val (blockId, file) = diskBlockManager.createTempBlock()
     val writer =
@@ -157,9 +160,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
     shuffleMemoryMap.synchronized {
       shuffleMemoryMap(Thread.currentThread().getId) = 0
     }
-    insertCount = 0
+    numPairsInMemory = 0
   }
 
+  /**
+   * Return an iterator that merges the in-memory map with the spilled maps.
+   * If no spill has occurred, simply return the in-memory map's iterator.
+   */
   override def iterator: Iterator[(K, C)] = {
     if (spilledMaps.isEmpty) {
       currentMap.iterator
@@ -168,7 +175,9 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
     }
   }
 
-  /** An iterator that sort-merges (K, C) pairs from the in-memory and on-disk maps */
+  /**
+   * An iterator that sort-merges (K, C) pairs from the in-memory map and the spilled maps
+   */
   private class ExternalIterator extends Iterator[(K, C)] {
 
     // A fixed-size queue that maintains a buffer for each stream we are currently merging
@@ -179,7 +188,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
     val sortedMap = currentMap.destructiveSortedIterator(comparator)
     val inputStreams = Seq(sortedMap) ++ spilledMaps
 
-    inputStreams.foreach{ it =>
+    inputStreams.foreach { it =>
       val kcPairs = getMorePairs(it)
       mergeHeap.enqueue(StreamBuffer(it, kcPairs))
     }
@@ -187,6 +196,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
     /**
      * Fetch from the given iterator until a key of different hash is retrieved. In the
      * event of key hash collisions, this ensures no pairs are hidden from being merged.
+     * Assume the given iterator is in sorted order.
      */
     def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
       val kcPairs = new ArrayBuffer[(K, C)]
@@ -219,17 +229,16 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
       baseCombiner
     }
 
-    override def hasNext: Boolean = {
-      mergeHeap.foreach{ buffer =>
-        if (!buffer.pairs.isEmpty) {
-          return true
-        }
-      }
-      false
-    }
+    /**
+     * Return true if there exists an input stream that still has unvisited pairs
+     */
+    override def hasNext: Boolean = mergeHeap.exists(!_.pairs.isEmpty)
 
+    /**
+     * Select a key with the minimum hash, then combine all values with the same key from all input streams.
+     */
     override def next(): (K, C) = {
-      // Select a return key from the StreamBuffer that holds the lowest key hash
+      // Select a key from the StreamBuffer that holds the lowest key hash
       val minBuffer = mergeHeap.dequeue()
       val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash)
       if (minPairs.length == 0) {
@@ -285,45 +294,43 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
     }
   }
 
-  // Iterate through (K, C) pairs in sorted order from an on-disk map
+  /**
+   * An iterator that returns (K, C) pairs in sorted order from an on-disk map
+   */
   private class DiskMapIterator(file: File) extends Iterator[(K, C)] {
     val fileStream = new FileInputStream(file)
     val bufferedStream = new FastBufferedInputStream(fileStream)
     val deserializeStream = ser.deserializeStream(bufferedStream)
-    var nextItem: Option[(K, C)] = None
+    var nextItem: (K, C) = null
     var eof = false
 
-    def readNextItem(): Option[(K, C)] = {
+    def readNextItem(): (K, C) = {
       if (!eof) {
         try {
-          return Some(deserializeStream.readObject().asInstanceOf[(K, C)])
+          return deserializeStream.readObject().asInstanceOf[(K, C)]
         } catch {
           case e: EOFException =>
             eof = true
             cleanup()
         }
       }
-      None
+      null
     }
 
     override def hasNext: Boolean = {
-      nextItem match {
-        case Some(item) => true
-        case None =>
-          nextItem = readNextItem()
-          nextItem.isDefined
+      if (nextItem == null) {
+        nextItem = readNextItem()
       }
+      nextItem != null
     }
 
     override def next(): (K, C) = {
-      nextItem match {
-        case Some(item) =>
-          nextItem = None
-          item
-        case None =>
-          val item = readNextItem()
-          item.getOrElse(throw new NoSuchElementException)
+      val item = if (nextItem == null) readNextItem() else nextItem
+      if (item == null) {
+        throw new NoSuchElementException
       }
+      nextItem = null
+      item
     }
 
     // TODO: Ensure this gets called even if the iterator isn't drained.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e4c51d21/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 6717757..c115849 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -104,14 +104,25 @@ Apart from these, the following properties are also available, and may be useful
 </tr>
 <tr>
   <td>spark.storage.memoryFraction</td>
-  <td>0.66</td>
+  <td>0.6</td>
   <td>
     Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old"
-    generation of objects in the JVM, which by default is given 2/3 of the heap, but you can increase
+    generation of objects in the JVM, which by default is given 0.6 of the heap, but you can increase
     it if you configure your own old generation size.
   </td>
 </tr>
 <tr>
+  <td>spark.shuffle.memoryFraction</td>
+  <td>0.3</td>
+  <td>
+    Fraction of Java heap to use for aggregation and cogroups during shuffles, if
+    <code>spark.shuffle.externalSorting</code> is enabled. At any given time, the collective size of
+    all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will
+    begin to spill to disk. If spills are often, consider increasing this value at the expense of
+    <code>spark.storage.memoryFraction</code>.
+  </td>
+</tr>
+<tr>
   <td>spark.mesos.coarse</td>
   <td>false</td>
   <td>
@@ -377,6 +388,15 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+  <td>spark.shuffle.externalSorting</td>
+  <td>true</td>
+  <td>
+    If set to "true", spills in-memory maps used for shuffles to disk when a memory threshold is reached. This
+    threshold is specified by <code>spark.shuffle.memoryFraction</code>. Enable this especially for memory-intensive
+    applications.
+  </td>
+</tr>
+<tr>
   <td>spark.speculation</td>
   <td>false</td>
   <td>