You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/06/19 03:47:03 UTC
spark git commit: [SPARK-21090][CORE] Optimize the unified memory
manager code
Repository: spark
Updated Branches:
refs/heads/master f913f158e -> 112bd9bfc
[SPARK-21090][CORE] Optimize the unified memory manager code
## What changes were proposed in this pull request?
1.In `acquireStorageMemory`, when the Memory Mode is OFF_HEAP ,the `maxOffHeapMemory` should be modified to `maxOffHeapStorageMemory`. after this PR,it will same as ON_HEAP Memory Mode.
Because when acquire memory is between `maxOffHeapStorageMemory` and `maxOffHeapMemory`,it will fail surely, so if acquire memory is greater than `maxOffHeapStorageMemory`(not greater than `maxOffHeapMemory`),we should fail fast.
2. Borrow memory from execution, `numBytes` modified to `numBytes - storagePool.memoryFree` will be more reasonable.
Because we just acquire `(numBytes - storagePool.memoryFree)`, unnecessary borrowed `numBytes` from execution
## How was this patch tested?
added unit test case
Author: liuxian <li...@zte.com.cn>
Closes #18296 from 10110346/wip-lx-0614.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/112bd9bf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/112bd9bf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/112bd9bf
Branch: refs/heads/master
Commit: 112bd9bfc5b9729f6f86518998b5d80c5e79fe5e
Parents: f913f15
Author: liuxian <li...@zte.com.cn>
Authored: Mon Jun 19 11:46:58 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Jun 19 11:46:58 2017 +0800
----------------------------------------------------------------------
.../spark/memory/UnifiedMemoryManager.scala | 5 +--
.../spark/memory/MemoryManagerSuite.scala | 2 +-
.../memory/UnifiedMemoryManagerSuite.scala | 32 ++++++++++++++++++++
3 files changed, 36 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/112bd9bf/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index fea2808..df19355 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -160,7 +160,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
case MemoryMode.OFF_HEAP => (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
- maxOffHeapMemory)
+ maxOffHeapStorageMemory)
}
if (numBytes > maxMemory) {
// Fail fast if the block simply won't fit
@@ -171,7 +171,8 @@ private[spark] class UnifiedMemoryManager private[memory] (
if (numBytes > storagePool.memoryFree) {
// There is not enough free memory in the storage pool, so try to borrow free memory from
// the execution pool.
- val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
+ val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree,
+ numBytes - storagePool.memoryFree)
executionPool.decrementPoolSize(memoryBorrowedFromExecution)
storagePool.incrementPoolSize(memoryBorrowedFromExecution)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/112bd9bf/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index eb2b3ff..85eeb50 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -117,7 +117,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
evictBlocksToFreeSpaceCalled.set(numBytesToFree)
if (numBytesToFree <= mm.storageMemoryUsed) {
// We can evict enough blocks to fulfill the request for space
- mm.releaseStorageMemory(numBytesToFree, MemoryMode.ON_HEAP)
+ mm.releaseStorageMemory(numBytesToFree, mm.tungstenMemoryMode)
evictedBlocks += Tuple2(null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L))
numBytesToFree
} else {
http://git-wip-us.apache.org/repos/asf/spark/blob/112bd9bf/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index c821054..02b04cd 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -303,4 +303,36 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
mm.invokePrivate[Unit](assertInvariants())
}
+ test("not enough free memory in the storage pool --OFF_HEAP") {
+ val conf = new SparkConf()
+ .set("spark.memory.offHeap.size", "1000")
+ .set("spark.testing.memory", "1000")
+ .set("spark.memory.offHeap.enabled", "true")
+ val taskAttemptId = 0L
+ val mm = UnifiedMemoryManager(conf, numCores = 1)
+ val ms = makeMemoryStore(mm)
+ val memoryMode = MemoryMode.OFF_HEAP
+
+ assert(mm.acquireExecutionMemory(400L, taskAttemptId, memoryMode) === 400L)
+ assert(mm.storageMemoryUsed === 0L)
+ assert(mm.executionMemoryUsed === 400L)
+
+ // Fail fast
+ assert(!mm.acquireStorageMemory(dummyBlock, 700L, memoryMode))
+ assert(mm.storageMemoryUsed === 0L)
+
+ assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode))
+ assert(mm.storageMemoryUsed === 100L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
+
+ // Borrow 50 from execution memory
+ assert(mm.acquireStorageMemory(dummyBlock, 450L, memoryMode))
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
+ assert(mm.storageMemoryUsed === 550L)
+
+ // Borrow 50 from execution memory and evict 50 to free space
+ assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode))
+ assertEvictBlocksToFreeSpaceCalled(ms, 50)
+ assert(mm.storageMemoryUsed === 600L)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org