You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/02/08 15:41:40 UTC
spark git commit: [SPARK-21860][CORE] Improve memory reuse for heap
memory in `HeapMemoryAllocator`
Repository: spark
Updated Branches:
refs/heads/master a75f92717 -> 76e019d9b
[SPARK-21860][CORE] Improve memory reuse for heap memory in `HeapMemoryAllocator`
## What changes were proposed in this pull request?
In `HeapMemoryAllocator`, when allocating memory from pool, and the key of pool is memory size.
Actually some size of memory ,such as 1025bytes,1026bytes,......1032bytes, we can think they are the sameļ¼because we allocate memory in multiples of 8 bytes.
In this case, we can improve memory reuse.
## How was this patch tested?
Existing tests and added unit tests
Author: liuxian <li...@zte.com.cn>
Closes #19077 from 10110346/headmemoptimize.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76e019d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76e019d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76e019d9
Branch: refs/heads/master
Commit: 76e019d9bdcdca176c79c1cd71ddbf496333bf93
Parents: a75f927
Author: liuxian <li...@zte.com.cn>
Authored: Thu Feb 8 23:41:30 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Feb 8 23:41:30 2018 +0800
----------------------------------------------------------------------
.../unsafe/memory/HeapMemoryAllocator.java | 18 +++++++++-------
.../apache/spark/unsafe/PlatformUtilSuite.java | 22 ++++++++++++++++++++
2 files changed, 33 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/76e019d9/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
index a9603c1..2733760 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
@@ -46,9 +46,12 @@ public class HeapMemoryAllocator implements MemoryAllocator {
@Override
public MemoryBlock allocate(long size) throws OutOfMemoryError {
- if (shouldPool(size)) {
+ int numWords = (int) ((size + 7) / 8);
+ long alignedSize = numWords * 8L;
+ assert (alignedSize >= size);
+ if (shouldPool(alignedSize)) {
synchronized (this) {
- final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(size);
+ final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool != null) {
while (!pool.isEmpty()) {
final WeakReference<long[]> arrayReference = pool.pop();
@@ -62,11 +65,11 @@ public class HeapMemoryAllocator implements MemoryAllocator {
return memory;
}
}
- bufferPoolsBySize.remove(size);
+ bufferPoolsBySize.remove(alignedSize);
}
}
}
- long[] array = new long[(int) ((size + 7) / 8)];
+ long[] array = new long[numWords];
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
@@ -98,12 +101,13 @@ public class HeapMemoryAllocator implements MemoryAllocator {
long[] array = (long[]) memory.obj;
memory.setObjAndOffset(null, 0);
- if (shouldPool(size)) {
+ long alignedSize = ((size + 7) / 8) * 8;
+ if (shouldPool(alignedSize)) {
synchronized (this) {
- LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(size);
+ LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool == null) {
pool = new LinkedList<>();
- bufferPoolsBySize.put(size, pool);
+ bufferPoolsBySize.put(alignedSize, pool);
}
pool.add(new WeakReference<>(array));
}
http://git-wip-us.apache.org/repos/asf/spark/blob/76e019d9/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
index 6285483..71c53d3 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.unsafe;
+import org.apache.spark.unsafe.memory.HeapMemoryAllocator;
import org.apache.spark.unsafe.memory.MemoryAllocator;
import org.apache.spark.unsafe.memory.MemoryBlock;
@@ -134,4 +135,25 @@ public class PlatformUtilSuite {
MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
MemoryAllocator.UNSAFE.free(offheap);
}
+
+ @Test
+ public void heapMemoryReuse() {
+ MemoryAllocator heapMem = new HeapMemoryAllocator();
+ // The size is less than `HeapMemoryAllocator.POOLING_THRESHOLD_BYTES`,allocate new memory every time.
+ MemoryBlock onheap1 = heapMem.allocate(513);
+ Object obj1 = onheap1.getBaseObject();
+ heapMem.free(onheap1);
+ MemoryBlock onheap2 = heapMem.allocate(514);
+ Assert.assertNotEquals(obj1, onheap2.getBaseObject());
+
+ // The size is greater than `HeapMemoryAllocator.POOLING_THRESHOLD_BYTES`,
+ // reuse the previous memory which has released.
+ MemoryBlock onheap3 = heapMem.allocate(1024 * 1024 + 1);
+ Assert.assertEquals(onheap3.size(), 1024 * 1024 + 1);
+ Object obj3 = onheap3.getBaseObject();
+ heapMem.free(onheap3);
+ MemoryBlock onheap4 = heapMem.allocate(1024 * 1024 + 7);
+ Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7);
+ Assert.assertEquals(obj3, onheap4.getBaseObject());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org