You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/12/02 04:51:15 UTC

spark git commit: [SPARK-12081] Make unified memory manager work with small heaps

Repository: spark
Updated Branches:
  refs/heads/master 1ce4adf55 -> d96f8c997


[SPARK-12081] Make unified memory manager work with small heaps

The existing `spark.memory.fraction` (default 0.75) gives the system 25% of the space to work with. For small heaps, this is not enough: e.g. default 1GB leaves only 250MB system memory. This is especially a problem in local mode, where the driver and executor are crammed in the same JVM. Members of the community have reported driver OOM's in such cases.

**New proposal.** We now reserve 300MB before taking the 75%. For 1GB JVMs, this leaves `(1024 - 300) * 0.75 = 543MB` for execution and storage. This is proposal (1) listed in the [JIRA](https://issues.apache.org/jira/browse/SPARK-12081).

Author: Andrew Or <an...@databricks.com>

Closes #10081 from andrewor14/unified-memory-small-heaps.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d96f8c99
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d96f8c99
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d96f8c99

Branch: refs/heads/master
Commit: d96f8c997b9bb5c3d61f513d2c71d67ccf8e85d6
Parents: 1ce4adf
Author: Andrew Or <an...@databricks.com>
Authored: Tue Dec 1 19:51:12 2015 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Tue Dec 1 19:51:12 2015 -0800

----------------------------------------------------------------------
 .../spark/memory/UnifiedMemoryManager.scala     | 22 ++++++++++++++++----
 .../memory/UnifiedMemoryManagerSuite.scala      | 20 ++++++++++++++++++
 docs/configuration.md                           |  4 ++--
 docs/tuning.md                                  |  2 +-
 4 files changed, 41 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d96f8c99/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 8be5b05..48b4e23 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -26,7 +26,7 @@ import org.apache.spark.storage.{BlockStatus, BlockId}
  * A [[MemoryManager]] that enforces a soft boundary between execution and storage such that
  * either side can borrow memory from the other.
  *
- * The region shared between execution and storage is a fraction of the total heap space
+ * The region shared between execution and storage is a fraction of (the total heap space - 300MB)
  * configurable through `spark.memory.fraction` (default 0.75). The position of the boundary
  * within this space is further determined by `spark.memory.storageFraction` (default 0.5).
  * This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default.
@@ -48,7 +48,7 @@ import org.apache.spark.storage.{BlockStatus, BlockId}
  */
 private[spark] class UnifiedMemoryManager private[memory] (
     conf: SparkConf,
-    maxMemory: Long,
+    val maxMemory: Long,
     private val storageRegionSize: Long,
     numCores: Int)
   extends MemoryManager(
@@ -130,6 +130,12 @@ private[spark] class UnifiedMemoryManager private[memory] (
 
 object UnifiedMemoryManager {
 
+  // Set aside a fixed amount of memory for non-storage, non-execution purposes.
+  // This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve
+  // sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then
+  // the memory used for execution and storage will be (1024 - 300) * 0.75 = 543MB by default.
+  private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
+
   def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
     val maxMemory = getMaxMemory(conf)
     new UnifiedMemoryManager(
@@ -144,8 +150,16 @@ object UnifiedMemoryManager {
    * Return the total amount of memory shared between execution and storage, in bytes.
    */
   private def getMaxMemory(conf: SparkConf): Long = {
-    val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
+    val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
+    val reservedMemory = conf.getLong("spark.testing.reservedMemory",
+      if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
+    val minSystemMemory = reservedMemory * 1.5
+    if (systemMemory < minSystemMemory) {
+      throw new IllegalArgumentException(s"System memory $systemMemory must " +
+        s"be at least $minSystemMemory. Please use a larger heap size.")
+    }
+    val usableMemory = systemMemory - reservedMemory
     val memoryFraction = conf.getDouble("spark.memory.fraction", 0.75)
-    (systemMaxMemory * memoryFraction).toLong
+    (usableMemory * memoryFraction).toLong
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d96f8c99/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 8cebe81..e97c898 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -182,4 +182,24 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
     assertEnsureFreeSpaceCalled(ms, 850L)
   }
 
+  test("small heap") {
+    val systemMemory = 1024 * 1024
+    val reservedMemory = 300 * 1024
+    val memoryFraction = 0.8
+    val conf = new SparkConf()
+      .set("spark.memory.fraction", memoryFraction.toString)
+      .set("spark.testing.memory", systemMemory.toString)
+      .set("spark.testing.reservedMemory", reservedMemory.toString)
+    val mm = UnifiedMemoryManager(conf, numCores = 1)
+    val expectedMaxMemory = ((systemMemory - reservedMemory) * memoryFraction).toLong
+    assert(mm.maxMemory === expectedMaxMemory)
+
+    // Try using a system memory that's too small
+    val conf2 = conf.clone().set("spark.testing.memory", (reservedMemory / 2).toString)
+    val exception = intercept[IllegalArgumentException] {
+      UnifiedMemoryManager(conf2, numCores = 1)
+    }
+    assert(exception.getMessage.contains("larger heap size"))
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d96f8c99/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 741d6b2..c39b489 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -719,8 +719,8 @@ Apart from these, the following properties are also available, and may be useful
   <td><code>spark.memory.fraction</code></td>
   <td>0.75</td>
   <td>
-    Fraction of the heap space used for execution and storage. The lower this is, the more
-    frequently spills and cached data eviction occur. The purpose of this config is to set
+    Fraction of (heap space - 300MB) used for execution and storage. The lower this is, the
+    more frequently spills and cached data eviction occur. The purpose of this config is to set
     aside memory for internal metadata, user data structures, and imprecise size estimation
     in the case of sparse, unusually large records. Leaving this at the default value is
     recommended. For more detail, see <a href="tuning.html#memory-management-overview">

http://git-wip-us.apache.org/repos/asf/spark/blob/d96f8c99/docs/tuning.md
----------------------------------------------------------------------
diff --git a/docs/tuning.md b/docs/tuning.md
index a8fe7a4..e73ed69 100644
--- a/docs/tuning.md
+++ b/docs/tuning.md
@@ -114,7 +114,7 @@ variety of workloads without requiring user expertise of how memory is divided i
 Although there are two relevant configurations, the typical user should not need to adjust them
 as the default values are applicable to most workloads:
 
-* `spark.memory.fraction` expresses the size of `M` as a fraction of the total JVM heap space
+* `spark.memory.fraction` expresses the size of `M` as a fraction of the (JVM heap space - 300MB)
 (default 0.75). The rest of the space (25%) is reserved for user data structures, internal
 metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually
 large records.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org