You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/27 16:52:36 UTC

carbondata git commit: [CARBONDATA-2184]Improve memory reuse for heap memory in `HeapMemoryAllocator`

Repository: carbondata
Updated Branches:
  refs/heads/master 2f926c034 -> d0858b73e


[CARBONDATA-2184]Improve memory reuse for heap memory in `HeapMemoryAllocator`

The description in [SPARK-21860|https://issues.apache.org/jira/browse/SPARK-21860]:
In `HeapMemoryAllocator`, when allocating memory from pool, and the key of pool is memory size.
Actually some size of memory ,such as 1025bytes,1026bytes,......1032bytes, we can think they are the sameļ¼Œbecause we allocate memory in multiples of 8 bytes.
In this case, we can improve memory reuse.

This closes #1982


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d0858b73
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d0858b73
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d0858b73

Branch: refs/heads/master
Commit: d0858b73ee3c552020f6bbdaa876d810b1c324ba
Parents: 2f926c0
Author: Zhang Zhichao <44...@qq.com>
Authored: Sun Feb 18 00:55:04 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Feb 28 00:51:47 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   | 12 ++++
 .../core/memory/HeapMemoryAllocator.java        | 52 +++++++++-----
 .../carbondata/core/util/CarbonProperties.java  | 19 ++++++
 .../core/memory/MemoryAllocatorUnitTest.java    | 71 ++++++++++++++++++++
 4 files changed, 139 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0858b73/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 6e6482d..0a06abc 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1598,6 +1598,18 @@ public final class CarbonCommonConstants {
       "carbon.query.validate.directqueryondatamap";
   public static final String VALIDATE_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE = "true";
 
+  /**
+   * If the heap memory allocations of the given size is greater or equal than this value,
+   * it should go through the pooling mechanism.
+   * But if set this size to -1, it should not go through the pooling mechanism.
+   * Default value is 1048576(1MB, the same as Spark).
+   * Unit: byte.
+   */
+  @CarbonProperty
+  public static final String CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES =
+      "carbon.heap.memory.pooling.threshold.bytes";
+  public static final String CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT = "1048576";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0858b73/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
index 5862933..242995b 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
@@ -23,16 +23,27 @@ import java.util.LinkedList;
 import java.util.Map;
 import javax.annotation.concurrent.GuardedBy;
 
+import org.apache.carbondata.core.util.CarbonProperties;
+
 /**
  * Code ported from Apache Spark {org.apache.spark.unsafe.memory} package
  * A simple {@link MemoryAllocator} that can allocate up to 16GB using a JVM long primitive array.
  */
 public class HeapMemoryAllocator implements MemoryAllocator {
 
-  @GuardedBy("this") private final Map<Long, LinkedList<WeakReference<MemoryBlock>>>
+  @GuardedBy("this") private final Map<Long, LinkedList<WeakReference<long[]>>>
       bufferPoolsBySize = new HashMap<>();
 
-  private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;
+  private int poolingThresholdBytes;
+  private boolean shouldPooling = true;
+
+  public HeapMemoryAllocator() {
+    poolingThresholdBytes = CarbonProperties.getInstance().getHeapMemoryPoolingThresholdBytes();
+    // if set 'poolingThresholdBytes' to -1, it should not go through the pooling mechanism.
+    if (poolingThresholdBytes == -1) {
+      shouldPooling = false;
+    }
+  }
 
   /**
    * Returns true if allocations of the given size should go through the pooling mechanism and
@@ -40,42 +51,53 @@ public class HeapMemoryAllocator implements MemoryAllocator {
    */
   private boolean shouldPool(long size) {
     // Very small allocations are less likely to benefit from pooling.
-    return size >= POOLING_THRESHOLD_BYTES;
+    return shouldPooling && (size >= poolingThresholdBytes);
   }
 
   @Override public MemoryBlock allocate(long size) throws OutOfMemoryError {
-    if (shouldPool(size)) {
+    int numWords = (int) ((size + 7) / 8);
+    long alignedSize = numWords * 8L;
+    assert (alignedSize >= size);
+    if (shouldPool(alignedSize)) {
       synchronized (this) {
-        final LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
+        final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
         if (pool != null) {
           while (!pool.isEmpty()) {
-            final WeakReference<MemoryBlock> blockReference = pool.pop();
-            final MemoryBlock memory = blockReference.get();
-            if (memory != null) {
-              assert (memory.size() == size);
+            final WeakReference<long[]> arrayReference = pool.pop();
+            final long[] array = arrayReference.get();
+            if (array != null) {
+              assert (array.length * 8L >= size);
+              MemoryBlock memory = new MemoryBlock(array, CarbonUnsafe.LONG_ARRAY_OFFSET, size);
               // reuse this MemoryBlock
               memory.setFreedStatus(false);
               return memory;
             }
           }
-          bufferPoolsBySize.remove(size);
+          bufferPoolsBySize.remove(alignedSize);
         }
       }
     }
-    long[] array = new long[(int) ((size + 7) / 8)];
+    long[] array = new long[numWords];
     return new MemoryBlock(array, CarbonUnsafe.LONG_ARRAY_OFFSET, size);
   }
 
   @Override public void free(MemoryBlock memory) {
     final long size = memory.size();
-    if (shouldPool(size)) {
+
+    // As an additional layer of defense against use-after-free bugs, we mutate the
+    // MemoryBlock to null out its reference to the long[] array.
+    long[] array = (long[]) memory.obj;
+    memory.setObjAndOffset(null, 0);
+
+    long alignedSize = ((size + 7) / 8) * 8;
+    if (shouldPool(alignedSize)) {
       synchronized (this) {
-        LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
+        LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
         if (pool == null) {
           pool = new LinkedList<>();
-          bufferPoolsBySize.put(size, pool);
+          bufferPoolsBySize.put(alignedSize, pool);
         }
-        pool.add(new WeakReference<>(memory));
+        pool.add(new WeakReference<>(array));
       }
     }
     memory.setFreedStatus(true);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0858b73/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 3dc7b8f..667c45c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1351,4 +1351,23 @@ public final class CarbonProperties {
         unsafeSortStorageMemory + "");
   }
 
+  /**
+   * Get the heap memory pooling threshold bytes.
+   */
+  public int getHeapMemoryPoolingThresholdBytes() {
+    int thresholdSize;
+    try {
+      thresholdSize = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES,
+              CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT));
+    } catch (NumberFormatException exc) {
+      LOGGER.error(
+          "The heap memory pooling threshold bytes is invalid. Using the default value "
+              + CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT);
+      thresholdSize = Integer.parseInt(
+          CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT);
+    }
+    return thresholdSize;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0858b73/core/src/test/java/org/apache/carbondata/core/memory/MemoryAllocatorUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/memory/MemoryAllocatorUnitTest.java b/core/src/test/java/org/apache/carbondata/core/memory/MemoryAllocatorUnitTest.java
new file mode 100644
index 0000000..df1e103
--- /dev/null
+++ b/core/src/test/java/org/apache/carbondata/core/memory/MemoryAllocatorUnitTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.memory;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+public class MemoryAllocatorUnitTest {
+
+  @Test
+  public void testHeapMemoryReuse() {
+    MemoryAllocator heapMem = new HeapMemoryAllocator();
+    // The size is less than 1024 * 1024,
+    // allocate new memory every time.
+    MemoryBlock onheap1 = heapMem.allocate(513);
+    Object obj1 = onheap1.getBaseObject();
+    heapMem.free(onheap1);
+    MemoryBlock onheap2 = heapMem.allocate(514);
+    Assert.assertNotEquals(obj1, onheap2.getBaseObject());
+
+    // The size is greater than 1024 * 1024,
+    // reuse the previous memory which has released.
+    MemoryBlock onheap3 = heapMem.allocate(1024 * 1024 + 1);
+    Assert.assertEquals(onheap3.size(), 1024 * 1024 + 1);
+    Object obj3 = onheap3.getBaseObject();
+    heapMem.free(onheap3);
+    MemoryBlock onheap4 = heapMem.allocate(1024 * 1024 + 7);
+    Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7);
+    Assert.assertEquals(obj3, onheap4.getBaseObject());
+  }
+
+  @Test
+  public void testHeapMemoryNotPool() {
+    // not pool
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES, "-1");
+
+    MemoryAllocator heapMem = new HeapMemoryAllocator();
+    MemoryBlock onheap1 = heapMem.allocate(513);
+    Object obj1 = onheap1.getBaseObject();
+    heapMem.free(onheap1);
+    MemoryBlock onheap2 = heapMem.allocate(514);
+    Assert.assertNotEquals(obj1, onheap2.getBaseObject());
+
+    MemoryBlock onheap3 = heapMem.allocate(1024 * 1024 + 1);
+    Assert.assertEquals(onheap3.size(), 1024 * 1024 + 1);
+    Object obj3 = onheap3.getBaseObject();
+    heapMem.free(onheap3);
+    MemoryBlock onheap4 = heapMem.allocate(1024 * 1024 + 7);
+    Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7);
+    Assert.assertNotEquals(obj3, onheap4.getBaseObject());
+  }
+}