You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/01/06 14:46:41 UTC
[spark] branch master updated: [SPARK-26527][CORE] Let
acquireUnrollMemory fail fast if required space exceeds memory limit
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 737f089 [SPARK-26527][CORE] Let acquireUnrollMemory fail fast if required space exceeds memory limit
737f089 is described below
commit 737f08949adecbae37bb92dfad71ae5f3a82cbee
Author: SongYadong <so...@zte.com.cn>
AuthorDate: Sun Jan 6 08:46:20 2019 -0600
[SPARK-26527][CORE] Let acquireUnrollMemory fail fast if required space exceeds memory limit
## What changes were proposed in this pull request?
When acquiring unroll memory from `StaticMemoryManager`, let it fail fast if required space exceeds memory limit, just like acquiring storage memory.
I think this may reduce some computation and memory evicting costs especially when required space(`numBytes`) is very big.
## How was this patch tested?
Existing unit tests.
Closes #23426 from SongYadong/acquireUnrollMemory_fail_fast.
Authored-by: SongYadong <so...@zte.com.cn>
Signed-off-by: Sean Owen <se...@databricks.com>
---
.../apache/spark/memory/StaticMemoryManager.scala | 27 ++++++++++++++--------
.../apache/spark/storage/MemoryStoreSuite.scala | 4 ++--
2 files changed, 19 insertions(+), 12 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index 8286087..0fd349d 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -80,16 +80,23 @@ private[spark] class StaticMemoryManager(
memoryMode: MemoryMode): Boolean = synchronized {
require(memoryMode != MemoryMode.OFF_HEAP,
"StaticMemoryManager does not support off-heap unroll memory")
- val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
- val freeMemory = onHeapStorageMemoryPool.memoryFree
- // When unrolling, we will use all of the existing free memory, and, if necessary,
- // some extra space freed from evicting cached blocks. We must place a cap on the
- // amount of memory to be evicted by unrolling, however, otherwise unrolling one
- // big block can blow away the entire cache.
- val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
- // Keep it within the range 0 <= X <= maxNumBytesToFree
- val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
- onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
+ if (numBytes > maxOnHeapStorageMemory) {
+ // Fail fast if the block simply won't fit
+ logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
+ s"memory limit ($maxOnHeapStorageMemory bytes)")
+ false
+ } else {
+ val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
+ val freeMemory = onHeapStorageMemoryPool.memoryFree
+ // When unrolling, we will use all of the existing free memory, and, if necessary,
+ // some extra space freed from evicting cached blocks. We must place a cap on the
+ // amount of memory to be evicted by unrolling, however, otherwise unrolling one
+ // big block can blow away the entire cache.
+ val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
+ // Keep it within the range 0 <= X <= maxNumBytesToFree
+ val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
+ onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
+ }
}
private[memory]
diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index 7274072..baff672 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -291,11 +291,11 @@ class MemoryStoreSuite
blockInfoManager.removeBlock("b3")
putIteratorAsBytes("b3", smallIterator, ClassTag.Any)
- // Unroll huge block with not enough space. This should fail and kick out b2 in the process.
+ // Unroll huge block with not enough space. This should fail.
val result4 = putIteratorAsBytes("b4", bigIterator, ClassTag.Any)
assert(result4.isLeft) // unroll was unsuccessful
assert(!memoryStore.contains("b1"))
- assert(!memoryStore.contains("b2"))
+ assert(memoryStore.contains("b2"))
assert(memoryStore.contains("b3"))
assert(!memoryStore.contains("b4"))
assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org