You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/11/19 19:45:50 UTC

spark git commit: [SPARK-4480] Avoid many small spills in external data structures (1.1)

Repository: spark
Updated Branches:
  refs/heads/branch-1.1 e22a75923 -> 16bf5f3d1


[SPARK-4480] Avoid many small spills in external data structures (1.1)

This is the branch-1.1 version of #3353. This requires a separate PR because the code in master has been refactored a little to eliminate duplicate code. I have tested this on a standalone cluster. The goal is to merge this into 1.1.1.

Author: Andrew Or <an...@databricks.com>

Closes #3354 from andrewor14/avoid-small-spills-1.1 and squashes the following commits:

f2e552c [Andrew Or] Fix tests
7012595 [Andrew Or] Avoid many small spills


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16bf5f3d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16bf5f3d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16bf5f3d

Branch: refs/heads/branch-1.1
Commit: 16bf5f3d17624db2a96c921fe8a1e153cdafb06c
Parents: e22a759
Author: Andrew Or <an...@databricks.com>
Authored: Wed Nov 19 10:45:42 2014 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Wed Nov 19 10:45:42 2014 -0800

----------------------------------------------------------------------
 .../spark/shuffle/ShuffleMemoryManager.scala    |  7 +++++--
 .../util/collection/ExternalAppendOnlyMap.scala | 20 +++++++++++++++-----
 .../spark/util/collection/ExternalSorter.scala  | 18 ++++++++++++++----
 .../util/collection/ExternalSorterSuite.scala   |  4 +++-
 4 files changed, 37 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/16bf5f3d/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
index ee91a36..c746e13 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
@@ -19,7 +19,7 @@ package org.apache.spark.shuffle
 
 import scala.collection.mutable
 
-import org.apache.spark.{Logging, SparkException, SparkConf}
+import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
 
 /**
  * Allocates a pool of memory to task threads for use in shuffle operations. Each disk-spilling
@@ -111,7 +111,7 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
   }
 }
 
-private object ShuffleMemoryManager {
+private[spark] object ShuffleMemoryManager {
   /**
    * Figure out the shuffle memory limit from a SparkConf. We currently have both a fraction
    * of the memory pool and a safety factor since collections can sometimes grow bigger than
@@ -122,4 +122,7 @@ private object ShuffleMemoryManager {
     val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
     (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
   }
+
+  // Initial threshold for the size of a collection before we start tracking its memory usage
+  val DEFAULT_INITIAL_MEMORY_THRESHOLD: Long = 5 * 1024 * 1024
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/16bf5f3d/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 96697d2..5619b30 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -28,10 +28,11 @@ import com.google.common.io.ByteStreams
 
 import org.apache.spark.{Logging, SparkEnv}
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.executor.ShuffleWriteMetrics
 import org.apache.spark.serializer.{DeserializationStream, Serializer}
+import org.apache.spark.shuffle.ShuffleMemoryManager
 import org.apache.spark.storage.{BlockId, BlockManager}
 import org.apache.spark.util.collection.ExternalAppendOnlyMap.HashComparator
-import org.apache.spark.executor.ShuffleWriteMetrics
 
 /**
  * :: DeveloperApi ::
@@ -81,8 +82,14 @@ class ExternalAppendOnlyMap[K, V, C](
   // Number of in-memory pairs inserted before tracking the map's shuffle memory usage
   private val trackMemoryThreshold = 1000
 
-  // How much of the shared memory pool this collection has claimed
-  private var myMemoryThreshold = 0L
+  // Initial threshold for the size of a collection before we start tracking its memory usage
+  private val initialMemoryThreshold =
+    SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold",
+      ShuffleMemoryManager.DEFAULT_INITIAL_MEMORY_THRESHOLD)
+
+  // Threshold for the collection's size in bytes before we start tracking its memory usage
+  // To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
+  private var myMemoryThreshold = initialMemoryThreshold
 
   /**
    * Size of object batches when reading/writing from serializers.
@@ -236,8 +243,11 @@ class ExternalAppendOnlyMap[K, V, C](
     spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes))
 
     // Release our memory back to the shuffle pool so that other threads can grab it
-    shuffleMemoryManager.release(myMemoryThreshold)
-    myMemoryThreshold = 0L
+    // The amount we requested does not include the initial memory tracking threshold
+    shuffleMemoryManager.release(myMemoryThreshold - initialMemoryThreshold)
+
+    // Reset this to the initial threshold to avoid spilling many small files
+    myMemoryThreshold = initialMemoryThreshold
 
     elementsRead = 0
     _memoryBytesSpilled += mapSize

http://git-wip-us.apache.org/repos/asf/spark/blob/16bf5f3d/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index d414ce3..a049746 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -28,6 +28,7 @@ import com.google.common.io.ByteStreams
 import org.apache.spark._
 import org.apache.spark.serializer.{DeserializationStream, Serializer}
 import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.shuffle.ShuffleMemoryManager
 import org.apache.spark.storage.{BlockObjectWriter, BlockId}
 
 /**
@@ -134,8 +135,14 @@ private[spark] class ExternalSorter[K, V, C](
   // Write metrics for current spill
   private var curWriteMetrics: ShuffleWriteMetrics = _
 
-  // How much of the shared memory pool this collection has claimed
-  private var myMemoryThreshold = 0L
+  // Initial threshold for the size of a collection before we start tracking its memory usage
+  private val initialMemoryThreshold =
+    SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold",
+      ShuffleMemoryManager.DEFAULT_INITIAL_MEMORY_THRESHOLD)
+
+  // Threshold for the collection's size in bytes before we start tracking its memory usage
+  // To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
+  private var myMemoryThreshold = initialMemoryThreshold
 
   // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't need
   // local aggregation and sorting, write numPartitions files directly and just concatenate them
@@ -285,8 +292,11 @@ private[spark] class ExternalSorter[K, V, C](
     }
 
     // Release our memory back to the shuffle pool so that other threads can grab it
-    shuffleMemoryManager.release(myMemoryThreshold)
-    myMemoryThreshold = 0
+    // The amount we requested does not include the initial memory tracking threshold
+    shuffleMemoryManager.release(myMemoryThreshold - initialMemoryThreshold)
+
+    // Reset this to the initial threshold to avoid spilling many small files
+    myMemoryThreshold = initialMemoryThreshold
 
     _memoryBytesSpilled += memorySize
     elementsRead = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/16bf5f3d/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index f26e40f..f4db3ff 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -127,6 +127,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
   test("empty partitions with spilling") {
     val conf = createSparkConf(false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
+    conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
 
@@ -152,6 +153,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
   test("empty partitions with spilling, bypass merge-sort") {
     val conf = createSparkConf(false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
+    conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
 
@@ -761,5 +763,5 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
     }
 
     sorter2.stop()
- }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org