You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by si...@apache.org on 2018/12/15 00:18:06 UTC
[arrow] branch master updated: ARROW-1807: [Java] consolidate bufs
to reduce heap (#3121)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new ce12fb5 ARROW-1807: [Java] consolidate bufs to reduce heap (#3121)
ce12fb5 is described below
commit ce12fb55107e2ee5439267fe1a17ded8d2210849
Author: Pindikura Ravindra <ra...@dremio.com>
AuthorDate: Sat Dec 15 05:48:01 2018 +0530
ARROW-1807: [Java] consolidate bufs to reduce heap (#3121)
- for fixed-len vectors, alloc a combined arrow buf for
value and validity.
- Remove the read-write locks in AllocationMgr, they
contribute about 150 bytes to the heap, and aren't very useful
since there isn't much contention.
---
.../org/apache/arrow/memory/AllocationManager.java | 34 ++------
.../apache/arrow/vector/BaseFixedWidthVector.java | 94 ++++++++++++++--------
.../arrow/vector/TestBufferOwnershipTransfer.java | 5 +-
.../org/apache/arrow/vector/TestListVector.java | 10 +--
4 files changed, 73 insertions(+), 70 deletions(-)
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
index aaa1f50..687674f 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
@@ -22,11 +22,8 @@ import static org.apache.arrow.memory.BaseAllocator.indent;
import java.util.IdentityHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.arrow.memory.BaseAllocator.Verbosity;
-import org.apache.arrow.memory.util.AutoCloseableLock;
import org.apache.arrow.memory.util.HistoricalLog;
import org.apache.arrow.util.Preconditions;
@@ -73,9 +70,6 @@ public class AllocationManager {
// ARROW-1627 Trying to minimize memory overhead caused by previously used IdentityHashMap
// see JIRA for details
private final LowCostIdentityHashMap<BaseAllocator, BufferLedger> map = new LowCostIdentityHashMap<>();
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
- private final AutoCloseableLock readLock = new AutoCloseableLock(lock.readLock());
- private final AutoCloseableLock writeLock = new AutoCloseableLock(lock.writeLock());
private final long amCreationTime = System.nanoTime();
private volatile BufferLedger owningLedger;
@@ -115,9 +109,8 @@ public class AllocationManager {
"A buffer can only be associated between two allocators that share the same root.");
}
- try (AutoCloseableLock read = readLock.open()) {
-
- final BufferLedger ledger = map.get(allocator);
+ synchronized (this) {
+ BufferLedger ledger = map.get(allocator);
if (ledger != null) {
if (retain) {
ledger.inc();
@@ -125,20 +118,7 @@ public class AllocationManager {
return ledger;
}
- }
- try (AutoCloseableLock write = writeLock.open()) {
- // we have to recheck existing ledger since a second reader => writer could be competing
- // with us.
-
- final BufferLedger existingLedger = map.get(allocator);
- if (existingLedger != null) {
- if (retain) {
- existingLedger.inc();
- }
- return existingLedger;
- }
-
- final BufferLedger ledger = new BufferLedger(allocator);
+ ledger = new BufferLedger(allocator);
if (retain) {
ledger.inc();
}
@@ -153,7 +133,7 @@ public class AllocationManager {
* The way that a particular BufferLedger communicates back to the AllocationManager that it
* now longer needs to hold
* a reference to particular piece of memory.
- * Can only be called when you already hold the writeLock.
+ * Can only be called when you already hold the lock.
*/
private void release(final BufferLedger ledger) {
final BaseAllocator allocator = ledger.getAllocator();
@@ -250,7 +230,7 @@ public class AllocationManager {
// since two balance transfers out from the allocator manager could cause incorrect
// accounting, we need to ensure
// that this won't happen by synchronizing on the allocator manager instance.
- try (AutoCloseableLock write = writeLock.open()) {
+ synchronized (this) {
if (owningLedger != this) {
return true;
}
@@ -330,7 +310,7 @@ public class AllocationManager {
allocator.assertOpen();
final int outcome;
- try (AutoCloseableLock write = writeLock.open()) {
+ synchronized (this) {
outcome = bufRefCnt.addAndGet(-decrement);
if (outcome == 0) {
lDestructionTime = System.nanoTime();
@@ -431,7 +411,7 @@ public class AllocationManager {
* @return Amount of accounted(owned) memory associated with this ledger.
*/
public int getAccountedSize() {
- try (AutoCloseableLock read = readLock.open()) {
+ synchronized (this) {
if (owningLedger == this) {
return size;
} else {
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java
index bc0b77a..f69a9d1 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java
@@ -270,7 +270,7 @@ public abstract class BaseFixedWidthVector extends BaseValueVector
long curAllocationSizeValue = valueAllocationSizeInBytes;
long curAllocationSizeValidity = validityAllocationSizeInBytes;
- if (curAllocationSizeValue > MAX_ALLOCATION_SIZE) {
+ if (align(curAllocationSizeValue) + curAllocationSizeValidity > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Requested amount of memory exceeds limit");
}
@@ -302,7 +302,7 @@ public abstract class BaseFixedWidthVector extends BaseValueVector
valueBufferSize = validityBufferSize;
}
- if (valueBufferSize > MAX_ALLOCATION_SIZE) {
+ if (align(valueBufferSize) + validityBufferSize > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Requested amount of memory is more than max allowed");
}
@@ -317,6 +317,13 @@ public abstract class BaseFixedWidthVector extends BaseValueVector
}
}
+ /*
+ * align to a 8-byte value.
+ */
+ private long align(long size) {
+ return ((size + 7) / 8) * 8;
+ }
+
/**
* Actual memory allocation is done by this function. All the calculations
* and knowledge about what size to allocate is upto the callers of this
@@ -327,14 +334,24 @@ public abstract class BaseFixedWidthVector extends BaseValueVector
* conditions.
*/
private void allocateBytes(final long valueBufferSize, final long validityBufferSize) {
- /* allocate data buffer */
- int curSize = (int) valueBufferSize;
- valueBuffer = allocator.buffer(curSize);
+ int valueBufferSlice = (int)align(valueBufferSize);
+ int validityBufferSlice = (int)validityBufferSize;
+
+ /* allocate combined buffer */
+ ArrowBuf buffer = allocator.buffer(valueBufferSlice + validityBufferSlice);
+
+ valueAllocationSizeInBytes = valueBufferSlice;
+ valueBuffer = buffer.slice(0, valueBufferSlice);
+ valueBuffer.retain();
valueBuffer.readerIndex(0);
- valueAllocationSizeInBytes = curSize;
- /* allocate validity buffer */
- allocateValidityBuffer((int) validityBufferSize);
+
+ validityAllocationSizeInBytes = validityBufferSlice;
+ validityBuffer = buffer.slice(valueBufferSlice, validityBufferSlice);
+ validityBuffer.retain();
+ validityBuffer.readerIndex(0);
zeroVector();
+
+ buffer.release();
}
/**
@@ -422,43 +439,50 @@ public abstract class BaseFixedWidthVector extends BaseValueVector
*/
@Override
public void reAlloc() {
- valueBuffer = reallocBufferHelper(valueBuffer, true);
- validityBuffer = reallocBufferHelper(validityBuffer, false);
- }
-
- /**
- * Helper method for reallocating a particular internal buffer
- * Returns the new buffer.
- */
- private ArrowBuf reallocBufferHelper(ArrowBuf buffer, final boolean dataBuffer) {
- final int currentBufferCapacity = buffer.capacity();
- long baseSize = (dataBuffer ? valueAllocationSizeInBytes
- : validityAllocationSizeInBytes);
+ int valueBaseSize = Integer.max(valueBuffer.capacity(), valueAllocationSizeInBytes);
+ long newValueBufferSlice = align(valueBaseSize * 2L);
+ long newValidityBufferSlice;
+ if (typeWidth > 0) {
+ long targetValueBufferSize = align(BaseAllocator.nextPowerOfTwo(newValueBufferSlice));
+ long targetValueCount = targetValueBufferSize / typeWidth;
+ targetValueBufferSize -= getValidityBufferSizeFromCount((int) targetValueCount);
+ if (newValueBufferSlice < targetValueBufferSize) {
+ newValueBufferSlice = targetValueBufferSize;
+ }
- if (baseSize < (long) currentBufferCapacity) {
- baseSize = (long) currentBufferCapacity;
+ newValidityBufferSlice = getValidityBufferSizeFromCount((int)(newValueBufferSlice / typeWidth));
+ } else {
+ newValidityBufferSlice = newValueBufferSlice;
}
- long newAllocationSize = baseSize * 2L;
- newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize);
+ long newAllocationSize = newValueBufferSlice + newValidityBufferSlice;
assert newAllocationSize >= 1;
if (newAllocationSize > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Unable to expand the buffer");
}
- final ArrowBuf newBuf = allocator.buffer((int) newAllocationSize);
- newBuf.setBytes(0, buffer, 0, currentBufferCapacity);
- newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity);
- buffer.release(1);
- buffer = newBuf;
- if (dataBuffer) {
- valueAllocationSizeInBytes = (int) newAllocationSize;
- } else {
- validityAllocationSizeInBytes = (int) newAllocationSize;
- }
+ final ArrowBuf newBuffer = allocator.buffer((int) newAllocationSize);
+ final ArrowBuf newValueBuffer = newBuffer.slice(0, (int)newValueBufferSlice);
+ newValueBuffer.setBytes(0, valueBuffer, 0, valueBuffer.capacity());
+ newValueBuffer.setZero(valueBuffer.capacity(), (int)newValueBufferSlice - valueBuffer.capacity());
+ newValueBuffer.retain();
+ newValueBuffer.readerIndex(0);
+ valueBuffer.release();
+ valueBuffer = newValueBuffer;
+ valueAllocationSizeInBytes = (int)newValueBufferSlice;
+
+ final ArrowBuf newValidityBuffer = newBuffer.slice((int)newValueBufferSlice,
+ (int)newValidityBufferSlice);
+ newValidityBuffer.setBytes(0, validityBuffer, 0, validityBuffer.capacity());
+ newValidityBuffer.setZero(validityBuffer.capacity(), (int)newValidityBufferSlice - validityBuffer.capacity());
+ newValidityBuffer.retain();
+ newValidityBuffer.readerIndex(0);
+ validityBuffer.release();
+ validityBuffer = newValidityBuffer;
+ validityAllocationSizeInBytes = (int)newValidityBufferSlice;
- return buffer;
+ newBuffer.release();
}
@Override
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestBufferOwnershipTransfer.java b/java/vector/src/test/java/org/apache/arrow/vector/TestBufferOwnershipTransfer.java
index 48bc893..9165343 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestBufferOwnershipTransfer.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestBufferOwnershipTransfer.java
@@ -40,15 +40,14 @@ public class TestBufferOwnershipTransfer {
IntVector v1 = new IntVector("v1", childAllocator1);
v1.allocateNew();
v1.setValueCount(4095);
+ long totalAllocatedMemory = childAllocator1.getAllocatedMemory();
IntVector v2 = new IntVector("v2", childAllocator2);
v1.makeTransferPair(v2).transfer();
assertEquals(0, childAllocator1.getAllocatedMemory());
- int expectedBitVector = 512;
- int expectedValueVector = 4096 * 4;
- assertEquals(expectedBitVector + expectedValueVector, childAllocator2.getAllocatedMemory());
+ assertEquals(totalAllocatedMemory, childAllocator2.getAllocatedMemory());
}
@Test
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java
index 4e8d8f0..68102b1 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java
@@ -774,13 +774,13 @@ public class TestListVector {
vector.setInitialCapacity(512);
vector.allocateNew();
assertEquals(512, vector.getValueCapacity());
- assertEquals(4096, vector.getDataVector().getValueCapacity());
+ assertTrue(vector.getDataVector().getValueCapacity() >= 512 * 5);
/* use density as 4 */
vector.setInitialCapacity(512, 4);
vector.allocateNew();
assertEquals(512, vector.getValueCapacity());
- assertEquals(512 * 4, vector.getDataVector().getValueCapacity());
+ assertTrue(vector.getDataVector().getValueCapacity() >= 512 * 4);
/**
* inner value capacity we pass to data vector is 512 * 0.1 => 51
@@ -793,7 +793,7 @@ public class TestListVector {
vector.setInitialCapacity(512, 0.1);
vector.allocateNew();
assertEquals(512, vector.getValueCapacity());
- assertEquals(64, vector.getDataVector().getValueCapacity());
+ assertTrue(vector.getDataVector().getValueCapacity() >= 51);
/**
* inner value capacity we pass to data vector is 512 * 0.01 => 5
@@ -806,7 +806,7 @@ public class TestListVector {
vector.setInitialCapacity(512, 0.01);
vector.allocateNew();
assertEquals(512, vector.getValueCapacity());
- assertEquals(8, vector.getDataVector().getValueCapacity());
+ assertTrue(vector.getDataVector().getValueCapacity() >= 5);
/**
* inner value capacity we pass to data vector is 5 * 0.1 => 0
@@ -822,7 +822,7 @@ public class TestListVector {
vector.setInitialCapacity(5, 0.1);
vector.allocateNew();
assertEquals(7, vector.getValueCapacity());
- assertEquals(1, vector.getDataVector().getValueCapacity());
+ assertTrue(vector.getDataVector().getValueCapacity() >= 1);
}
}