You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/27 16:52:36 UTC
carbondata git commit: [CARBONDATA-2184]Improve memory reuse for heap
memory in `HeapMemoryAllocator`
Repository: carbondata
Updated Branches:
refs/heads/master 2f926c034 -> d0858b73e
[CARBONDATA-2184]Improve memory reuse for heap memory in `HeapMemoryAllocator`
The description in [SPARK-21860|https://issues.apache.org/jira/browse/SPARK-21860]:
In `HeapMemoryAllocator`, when allocating memory from pool, and the key of pool is memory size.
Actually some size of memory ,such as 1025bytes,1026bytes,......1032bytes, we can think they are the sameļ¼because we allocate memory in multiples of 8 bytes.
In this case, we can improve memory reuse.
This closes #1982
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d0858b73
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d0858b73
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d0858b73
Branch: refs/heads/master
Commit: d0858b73ee3c552020f6bbdaa876d810b1c324ba
Parents: 2f926c0
Author: Zhang Zhichao <44...@qq.com>
Authored: Sun Feb 18 00:55:04 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Feb 28 00:51:47 2018 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 12 ++++
.../core/memory/HeapMemoryAllocator.java | 52 +++++++++-----
.../carbondata/core/util/CarbonProperties.java | 19 ++++++
.../core/memory/MemoryAllocatorUnitTest.java | 71 ++++++++++++++++++++
4 files changed, 139 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0858b73/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 6e6482d..0a06abc 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1598,6 +1598,18 @@ public final class CarbonCommonConstants {
"carbon.query.validate.directqueryondatamap";
public static final String VALIDATE_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE = "true";
+ /**
+ * If the heap memory allocations of the given size is greater or equal than this value,
+ * it should go through the pooling mechanism.
+ * But if set this size to -1, it should not go through the pooling mechanism.
+ * Default value is 1048576(1MB, the same as Spark).
+ * Unit: byte.
+ */
+ @CarbonProperty
+ public static final String CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES =
+ "carbon.heap.memory.pooling.threshold.bytes";
+ public static final String CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT = "1048576";
+
private CarbonCommonConstants() {
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0858b73/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
index 5862933..242995b 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
@@ -23,16 +23,27 @@ import java.util.LinkedList;
import java.util.Map;
import javax.annotation.concurrent.GuardedBy;
+import org.apache.carbondata.core.util.CarbonProperties;
+
/**
* Code ported from Apache Spark {org.apache.spark.unsafe.memory} package
* A simple {@link MemoryAllocator} that can allocate up to 16GB using a JVM long primitive array.
*/
public class HeapMemoryAllocator implements MemoryAllocator {
- @GuardedBy("this") private final Map<Long, LinkedList<WeakReference<MemoryBlock>>>
+ @GuardedBy("this") private final Map<Long, LinkedList<WeakReference<long[]>>>
bufferPoolsBySize = new HashMap<>();
- private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;
+ private int poolingThresholdBytes;
+ private boolean shouldPooling = true;
+
+ public HeapMemoryAllocator() {
+ poolingThresholdBytes = CarbonProperties.getInstance().getHeapMemoryPoolingThresholdBytes();
+ // if set 'poolingThresholdBytes' to -1, it should not go through the pooling mechanism.
+ if (poolingThresholdBytes == -1) {
+ shouldPooling = false;
+ }
+ }
/**
* Returns true if allocations of the given size should go through the pooling mechanism and
@@ -40,42 +51,53 @@ public class HeapMemoryAllocator implements MemoryAllocator {
*/
private boolean shouldPool(long size) {
// Very small allocations are less likely to benefit from pooling.
- return size >= POOLING_THRESHOLD_BYTES;
+ return shouldPooling && (size >= poolingThresholdBytes);
}
@Override public MemoryBlock allocate(long size) throws OutOfMemoryError {
- if (shouldPool(size)) {
+ int numWords = (int) ((size + 7) / 8);
+ long alignedSize = numWords * 8L;
+ assert (alignedSize >= size);
+ if (shouldPool(alignedSize)) {
synchronized (this) {
- final LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
+ final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool != null) {
while (!pool.isEmpty()) {
- final WeakReference<MemoryBlock> blockReference = pool.pop();
- final MemoryBlock memory = blockReference.get();
- if (memory != null) {
- assert (memory.size() == size);
+ final WeakReference<long[]> arrayReference = pool.pop();
+ final long[] array = arrayReference.get();
+ if (array != null) {
+ assert (array.length * 8L >= size);
+ MemoryBlock memory = new MemoryBlock(array, CarbonUnsafe.LONG_ARRAY_OFFSET, size);
// reuse this MemoryBlock
memory.setFreedStatus(false);
return memory;
}
}
- bufferPoolsBySize.remove(size);
+ bufferPoolsBySize.remove(alignedSize);
}
}
}
- long[] array = new long[(int) ((size + 7) / 8)];
+ long[] array = new long[numWords];
return new MemoryBlock(array, CarbonUnsafe.LONG_ARRAY_OFFSET, size);
}
@Override public void free(MemoryBlock memory) {
final long size = memory.size();
- if (shouldPool(size)) {
+
+ // As an additional layer of defense against use-after-free bugs, we mutate the
+ // MemoryBlock to null out its reference to the long[] array.
+ long[] array = (long[]) memory.obj;
+ memory.setObjAndOffset(null, 0);
+
+ long alignedSize = ((size + 7) / 8) * 8;
+ if (shouldPool(alignedSize)) {
synchronized (this) {
- LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
+ LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool == null) {
pool = new LinkedList<>();
- bufferPoolsBySize.put(size, pool);
+ bufferPoolsBySize.put(alignedSize, pool);
}
- pool.add(new WeakReference<>(memory));
+ pool.add(new WeakReference<>(array));
}
}
memory.setFreedStatus(true);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0858b73/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 3dc7b8f..667c45c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1351,4 +1351,23 @@ public final class CarbonProperties {
unsafeSortStorageMemory + "");
}
+ /**
+ * Get the heap memory pooling threshold bytes.
+ */
+ public int getHeapMemoryPoolingThresholdBytes() {
+ int thresholdSize;
+ try {
+ thresholdSize = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES,
+ CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT));
+ } catch (NumberFormatException exc) {
+ LOGGER.error(
+ "The heap memory pooling threshold bytes is invalid. Using the default value "
+ + CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT);
+ thresholdSize = Integer.parseInt(
+ CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT);
+ }
+ return thresholdSize;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0858b73/core/src/test/java/org/apache/carbondata/core/memory/MemoryAllocatorUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/memory/MemoryAllocatorUnitTest.java b/core/src/test/java/org/apache/carbondata/core/memory/MemoryAllocatorUnitTest.java
new file mode 100644
index 0000000..df1e103
--- /dev/null
+++ b/core/src/test/java/org/apache/carbondata/core/memory/MemoryAllocatorUnitTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.memory;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+public class MemoryAllocatorUnitTest {
+
+ @Test
+ public void testHeapMemoryReuse() {
+ MemoryAllocator heapMem = new HeapMemoryAllocator();
+ // The size is less than 1024 * 1024,
+ // allocate new memory every time.
+ MemoryBlock onheap1 = heapMem.allocate(513);
+ Object obj1 = onheap1.getBaseObject();
+ heapMem.free(onheap1);
+ MemoryBlock onheap2 = heapMem.allocate(514);
+ Assert.assertNotEquals(obj1, onheap2.getBaseObject());
+
+ // The size is greater than 1024 * 1024,
+ // reuse the previous memory which has released.
+ MemoryBlock onheap3 = heapMem.allocate(1024 * 1024 + 1);
+ Assert.assertEquals(onheap3.size(), 1024 * 1024 + 1);
+ Object obj3 = onheap3.getBaseObject();
+ heapMem.free(onheap3);
+ MemoryBlock onheap4 = heapMem.allocate(1024 * 1024 + 7);
+ Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7);
+ Assert.assertEquals(obj3, onheap4.getBaseObject());
+ }
+
+ @Test
+ public void testHeapMemoryNotPool() {
+ // not pool
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES, "-1");
+
+ MemoryAllocator heapMem = new HeapMemoryAllocator();
+ MemoryBlock onheap1 = heapMem.allocate(513);
+ Object obj1 = onheap1.getBaseObject();
+ heapMem.free(onheap1);
+ MemoryBlock onheap2 = heapMem.allocate(514);
+ Assert.assertNotEquals(obj1, onheap2.getBaseObject());
+
+ MemoryBlock onheap3 = heapMem.allocate(1024 * 1024 + 1);
+ Assert.assertEquals(onheap3.size(), 1024 * 1024 + 1);
+ Object obj3 = onheap3.getBaseObject();
+ heapMem.free(onheap3);
+ MemoryBlock onheap4 = heapMem.allocate(1024 * 1024 + 7);
+ Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7);
+ Assert.assertNotEquals(obj3, onheap4.getBaseObject());
+ }
+}