You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/11/20 03:07:35 UTC

spark git commit: [SPARK-4480] Avoid many small spills in external data structures

Repository: spark
Updated Branches:
  refs/heads/master 73fedf5a6 -> 0eb4a7fb0


[SPARK-4480] Avoid many small spills in external data structures

**Summary.** Currently, we may spill many small files in `ExternalAppendOnlyMap` and `ExternalSorter`. The underlying root cause of this is summarized in [SPARK-4452](https://issues.apache.org/jira/browse/SPARK-4452). This PR does not address this root cause, but simply provides the guarantee that we never spill the in-memory data structure if its size is less than a configurable threshold of 5MB. This config is not documented because we don't want users to set it themselves, and it is not hard-coded because we need to change it in tests.

**Symptom.** Each spill is orders of magnitude smaller than 1MB, and there are many spills. In environments where the ulimit is set, this frequently causes "too many open file" exceptions observed in [SPARK-3633](https://issues.apache.org/jira/browse/SPARK-3633).
```
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4792 B to disk (292769 spills so far)
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4760 B to disk (292770 spills so far)
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4520 B to disk (292771 spills so far)
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4560 B to disk (292772 spills so far)
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4792 B to disk (292773 spills so far)
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4784 B to disk (292774 spills so far)
```

**Reproduction.** I ran the following on a small 4-node cluster with 512MB executors. Note that the back-to-back shuffle here is necessary for reasons described in [SPARK-4522](https://issues.apache.org/jira/browse/SPARK-4452). The second shuffle is a `reduceByKey` because it performs a map-side combine.
```
sc.parallelize(1 to 100000000, 100)
  .map { i => (i, i) }
  .groupByKey()
  .reduceByKey(_ ++ _)
  .count()
```
Before the change, I notice that each thread may spill up to 1000 times, and the size of each spill is on the order of 10KB. After the change, each thread spills only up to 20 times in the worst case, and the size of each spill is on the order of 1MB.

Author: Andrew Or <an...@databricks.com>

Closes #3353 from andrewor14/avoid-small-spills and squashes the following commits:

49f380f [Andrew Or] Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/spark into avoid-small-spills
27d6966 [Andrew Or] Merge branch 'master' of github.com:apache/spark into avoid-small-spills
f4736e3 [Andrew Or] Fix tests
a919776 [Andrew Or] Avoid many small spills


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0eb4a7fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0eb4a7fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0eb4a7fb

Branch: refs/heads/master
Commit: 0eb4a7fb0fa1fa56677488cbd74eb39e65317621
Parents: 73fedf5
Author: Andrew Or <an...@databricks.com>
Authored: Wed Nov 19 18:07:27 2014 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Wed Nov 19 18:07:27 2014 -0800

----------------------------------------------------------------------
 .../spark/util/collection/Spillable.scala       | 28 +++++++++++---------
 .../util/collection/ExternalSorterSuite.scala   |  2 ++
 2 files changed, 18 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0eb4a7fb/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
index cb73b37..9f54312 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
@@ -24,10 +24,7 @@ import org.apache.spark.SparkEnv
  * Spills contents of an in-memory collection to disk when the memory threshold
  * has been exceeded.
  */
-private[spark] trait Spillable[C] {
-
-  this: Logging =>
-
+private[spark] trait Spillable[C] extends Logging {
   /**
    * Spills the current in-memory collection to disk, and releases the memory.
    *
@@ -45,15 +42,21 @@ private[spark] trait Spillable[C] {
   // Memory manager that can be used to acquire/release memory
   private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
 
-  // What threshold of elementsRead we start estimating collection size at
+  // Threshold for `elementsRead` before we start tracking this collection's memory usage
   private[this] val trackMemoryThreshold = 1000
 
+  // Initial threshold for the size of a collection before we start tracking its memory usage
+  // Exposed for testing
+  private[this] val initialMemoryThreshold: Long =
+    SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold", 5 * 1024 * 1024)
+
+  // Threshold for this collection's size in bytes before we start tracking its memory usage
+  // To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
+  private[this] var myMemoryThreshold = initialMemoryThreshold
+
   // Number of elements read from input since last spill
   private[this] var _elementsRead = 0L
 
-  // How much of the shared memory pool this collection has claimed
-  private[this] var myMemoryThreshold = 0L
-
   // Number of bytes spilled in total
   private[this] var _memoryBytesSpilled = 0L
 
@@ -102,8 +105,9 @@ private[spark] trait Spillable[C] {
    * Release our memory back to the shuffle pool so that other threads can grab it.
    */
   private def releaseMemoryForThisThread(): Unit = {
-    shuffleMemoryManager.release(myMemoryThreshold)
-    myMemoryThreshold = 0L
+    // The amount we requested does not include the initial memory tracking threshold
+    shuffleMemoryManager.release(myMemoryThreshold - initialMemoryThreshold)
+    myMemoryThreshold = initialMemoryThreshold
   }
 
   /**
@@ -114,7 +118,7 @@ private[spark] trait Spillable[C] {
   @inline private def logSpillage(size: Long) {
     val threadId = Thread.currentThread().getId
     logInfo("Thread %d spilling in-memory map of %s to disk (%d time%s so far)"
-        .format(threadId, org.apache.spark.util.Utils.bytesToString(size),
-            _spillCount, if (_spillCount > 1) "s" else ""))
+      .format(threadId, org.apache.spark.util.Utils.bytesToString(size),
+        _spillCount, if (_spillCount > 1) "s" else ""))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0eb4a7fb/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index f26e40f..3cb42d4 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -127,6 +127,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
   test("empty partitions with spilling") {
     val conf = createSparkConf(false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
+    conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
 
@@ -152,6 +153,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
   test("empty partitions with spilling, bypass merge-sort") {
     val conf = createSparkConf(false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
+    conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org