You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/02/08 15:41:40 UTC

spark git commit: [SPARK-21860][CORE] Improve memory reuse for heap memory in `HeapMemoryAllocator`

Repository: spark
Updated Branches:
  refs/heads/master a75f92717 -> 76e019d9b


[SPARK-21860][CORE] Improve memory reuse for heap memory in `HeapMemoryAllocator`

## What changes were proposed in this pull request?
In `HeapMemoryAllocator`, when allocating memory from pool, and the key of pool is memory size.
Actually some size of memory ,such as 1025bytes,1026bytes,......1032bytes, we can think they are the sameļ¼Œbecause we allocate memory in multiples of 8 bytes.
In this case, we can improve memory reuse.

## How was this patch tested?
Existing tests and added unit tests

Author: liuxian <li...@zte.com.cn>

Closes #19077 from 10110346/headmemoptimize.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76e019d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76e019d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76e019d9

Branch: refs/heads/master
Commit: 76e019d9bdcdca176c79c1cd71ddbf496333bf93
Parents: a75f927
Author: liuxian <li...@zte.com.cn>
Authored: Thu Feb 8 23:41:30 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Feb 8 23:41:30 2018 +0800

----------------------------------------------------------------------
 .../unsafe/memory/HeapMemoryAllocator.java      | 18 +++++++++-------
 .../apache/spark/unsafe/PlatformUtilSuite.java  | 22 ++++++++++++++++++++
 2 files changed, 33 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/76e019d9/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
index a9603c1..2733760 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
@@ -46,9 +46,12 @@ public class HeapMemoryAllocator implements MemoryAllocator {
 
   @Override
   public MemoryBlock allocate(long size) throws OutOfMemoryError {
-    if (shouldPool(size)) {
+    int numWords = (int) ((size + 7) / 8);
+    long alignedSize = numWords * 8L;
+    assert (alignedSize >= size);
+    if (shouldPool(alignedSize)) {
       synchronized (this) {
-        final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(size);
+        final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
         if (pool != null) {
           while (!pool.isEmpty()) {
             final WeakReference<long[]> arrayReference = pool.pop();
@@ -62,11 +65,11 @@ public class HeapMemoryAllocator implements MemoryAllocator {
               return memory;
             }
           }
-          bufferPoolsBySize.remove(size);
+          bufferPoolsBySize.remove(alignedSize);
         }
       }
     }
-    long[] array = new long[(int) ((size + 7) / 8)];
+    long[] array = new long[numWords];
     MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
     if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
       memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
@@ -98,12 +101,13 @@ public class HeapMemoryAllocator implements MemoryAllocator {
     long[] array = (long[]) memory.obj;
     memory.setObjAndOffset(null, 0);
 
-    if (shouldPool(size)) {
+    long alignedSize = ((size + 7) / 8) * 8;
+    if (shouldPool(alignedSize)) {
       synchronized (this) {
-        LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(size);
+        LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
         if (pool == null) {
           pool = new LinkedList<>();
-          bufferPoolsBySize.put(size, pool);
+          bufferPoolsBySize.put(alignedSize, pool);
         }
         pool.add(new WeakReference<>(array));
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/76e019d9/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
index 6285483..71c53d3 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.unsafe;
 
+import org.apache.spark.unsafe.memory.HeapMemoryAllocator;
 import org.apache.spark.unsafe.memory.MemoryAllocator;
 import org.apache.spark.unsafe.memory.MemoryBlock;
 
@@ -134,4 +135,25 @@ public class PlatformUtilSuite {
       MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
     MemoryAllocator.UNSAFE.free(offheap);
   }
+
+  @Test
+  public void heapMemoryReuse() {
+    MemoryAllocator heapMem = new HeapMemoryAllocator();
+    // The size is less than `HeapMemoryAllocator.POOLING_THRESHOLD_BYTES`,allocate new memory every time.
+    MemoryBlock onheap1 = heapMem.allocate(513);
+    Object obj1 = onheap1.getBaseObject();
+    heapMem.free(onheap1);
+    MemoryBlock onheap2 = heapMem.allocate(514);
+    Assert.assertNotEquals(obj1, onheap2.getBaseObject());
+
+    // The size is greater than `HeapMemoryAllocator.POOLING_THRESHOLD_BYTES`,
+    // reuse the previous memory which has released.
+    MemoryBlock onheap3 = heapMem.allocate(1024 * 1024 + 1);
+    Assert.assertEquals(onheap3.size(), 1024 * 1024 + 1);
+    Object obj3 = onheap3.getBaseObject();
+    heapMem.free(onheap3);
+    MemoryBlock onheap4 = heapMem.allocate(1024 * 1024 + 7);
+    Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7);
+    Assert.assertEquals(obj3, onheap4.getBaseObject());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org