You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/01/06 14:46:41 UTC

[spark] branch master updated: [SPARK-26527][CORE] Let acquireUnrollMemory fail fast if required space exceeds memory limit

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 737f089  [SPARK-26527][CORE] Let acquireUnrollMemory fail fast if required space exceeds memory limit
737f089 is described below

commit 737f08949adecbae37bb92dfad71ae5f3a82cbee
Author: SongYadong <so...@zte.com.cn>
AuthorDate: Sun Jan 6 08:46:20 2019 -0600

    [SPARK-26527][CORE] Let acquireUnrollMemory fail fast if required space exceeds memory limit
    
    ## What changes were proposed in this pull request?
    
    When acquiring unroll memory from `StaticMemoryManager`, let it fail fast if required space exceeds memory limit, just like acquiring storage memory.
    I think this may reduce some computation and memory evicting costs especially when required space(`numBytes`) is very big.
    
    ## How was this patch tested?
    
    Existing unit tests.
    
    Closes #23426 from SongYadong/acquireUnrollMemory_fail_fast.
    
    Authored-by: SongYadong <so...@zte.com.cn>
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 .../apache/spark/memory/StaticMemoryManager.scala  | 27 ++++++++++++++--------
 .../apache/spark/storage/MemoryStoreSuite.scala    |  4 ++--
 2 files changed, 19 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index 8286087..0fd349d 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -80,16 +80,23 @@ private[spark] class StaticMemoryManager(
       memoryMode: MemoryMode): Boolean = synchronized {
     require(memoryMode != MemoryMode.OFF_HEAP,
       "StaticMemoryManager does not support off-heap unroll memory")
-    val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
-    val freeMemory = onHeapStorageMemoryPool.memoryFree
-    // When unrolling, we will use all of the existing free memory, and, if necessary,
-    // some extra space freed from evicting cached blocks. We must place a cap on the
-    // amount of memory to be evicted by unrolling, however, otherwise unrolling one
-    // big block can blow away the entire cache.
-    val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
-    // Keep it within the range 0 <= X <= maxNumBytesToFree
-    val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
-    onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
+    if (numBytes > maxOnHeapStorageMemory) {
+      // Fail fast if the block simply won't fit
+      logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
+        s"memory limit ($maxOnHeapStorageMemory bytes)")
+      false
+    } else {
+      val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
+      val freeMemory = onHeapStorageMemoryPool.memoryFree
+      // When unrolling, we will use all of the existing free memory, and, if necessary,
+      // some extra space freed from evicting cached blocks. We must place a cap on the
+      // amount of memory to be evicted by unrolling, however, otherwise unrolling one
+      // big block can blow away the entire cache.
+      val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
+      // Keep it within the range 0 <= X <= maxNumBytesToFree
+      val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
+      onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
+    }
   }
 
   private[memory]
diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index 7274072..baff672 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -291,11 +291,11 @@ class MemoryStoreSuite
     blockInfoManager.removeBlock("b3")
     putIteratorAsBytes("b3", smallIterator, ClassTag.Any)
 
-    // Unroll huge block with not enough space. This should fail and kick out b2 in the process.
+    // Unroll huge block with not enough space. This should fail.
     val result4 = putIteratorAsBytes("b4", bigIterator, ClassTag.Any)
     assert(result4.isLeft) // unroll was unsuccessful
     assert(!memoryStore.contains("b1"))
-    assert(!memoryStore.contains("b2"))
+    assert(memoryStore.contains("b2"))
     assert(memoryStore.contains("b3"))
     assert(!memoryStore.contains("b4"))
     assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org