You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by si...@apache.org on 2018/12/15 00:18:06 UTC

[arrow] branch master updated: ARROW-1807: [Java] consolidate bufs to reduce heap (#3121)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new ce12fb5  ARROW-1807: [Java] consolidate bufs to reduce heap (#3121)
ce12fb5 is described below

commit ce12fb55107e2ee5439267fe1a17ded8d2210849
Author: Pindikura Ravindra <ra...@dremio.com>
AuthorDate: Sat Dec 15 05:48:01 2018 +0530

    ARROW-1807: [Java] consolidate bufs to reduce heap (#3121)
    
    - for fixed-len vectors, alloc a combined arrow buf for
      value and validity.
    - Remove the read-write locks in AllocationMgr, they
      contribute about 150 bytes to the heap, and aren't very useful
      since there isn't much contention.
---
 .../org/apache/arrow/memory/AllocationManager.java | 34 ++------
 .../apache/arrow/vector/BaseFixedWidthVector.java  | 94 ++++++++++++++--------
 .../arrow/vector/TestBufferOwnershipTransfer.java  |  5 +-
 .../org/apache/arrow/vector/TestListVector.java    | 10 +--
 4 files changed, 73 insertions(+), 70 deletions(-)

diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
index aaa1f50..687674f 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
@@ -22,11 +22,8 @@ import static org.apache.arrow.memory.BaseAllocator.indent;
 import java.util.IdentityHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.arrow.memory.BaseAllocator.Verbosity;
-import org.apache.arrow.memory.util.AutoCloseableLock;
 import org.apache.arrow.memory.util.HistoricalLog;
 import org.apache.arrow.util.Preconditions;
 
@@ -73,9 +70,6 @@ public class AllocationManager {
   // ARROW-1627 Trying to minimize memory overhead caused by previously used IdentityHashMap
   // see JIRA for details
   private final LowCostIdentityHashMap<BaseAllocator, BufferLedger> map = new LowCostIdentityHashMap<>();
-  private final ReadWriteLock lock = new ReentrantReadWriteLock();
-  private final AutoCloseableLock readLock = new AutoCloseableLock(lock.readLock());
-  private final AutoCloseableLock writeLock = new AutoCloseableLock(lock.writeLock());
   private final long amCreationTime = System.nanoTime();
 
   private volatile BufferLedger owningLedger;
@@ -115,9 +109,8 @@ public class AllocationManager {
           "A buffer can only be associated between two allocators that share the same root.");
     }
 
-    try (AutoCloseableLock read = readLock.open()) {
-
-      final BufferLedger ledger = map.get(allocator);
+    synchronized (this) {
+      BufferLedger ledger = map.get(allocator);
       if (ledger != null) {
         if (retain) {
           ledger.inc();
@@ -125,20 +118,7 @@ public class AllocationManager {
         return ledger;
       }
 
-    }
-    try (AutoCloseableLock write = writeLock.open()) {
-      // we have to recheck existing ledger since a second reader => writer could be competing
-      // with us.
-
-      final BufferLedger existingLedger = map.get(allocator);
-      if (existingLedger != null) {
-        if (retain) {
-          existingLedger.inc();
-        }
-        return existingLedger;
-      }
-
-      final BufferLedger ledger = new BufferLedger(allocator);
+      ledger = new BufferLedger(allocator);
       if (retain) {
         ledger.inc();
       }
@@ -153,7 +133,7 @@ public class AllocationManager {
    * The way that a particular BufferLedger communicates back to the AllocationManager that it
    * now longer needs to hold
    * a reference to particular piece of memory.
-   * Can only be called when you already hold the writeLock.
+   * Can only be called when you already hold the lock.
    */
   private void release(final BufferLedger ledger) {
     final BaseAllocator allocator = ledger.getAllocator();
@@ -250,7 +230,7 @@ public class AllocationManager {
       // since two balance transfers out from the allocator manager could cause incorrect
       // accounting, we need to ensure
       // that this won't happen by synchronizing on the allocator manager instance.
-      try (AutoCloseableLock write = writeLock.open()) {
+      synchronized (this) {
         if (owningLedger != this) {
           return true;
         }
@@ -330,7 +310,7 @@ public class AllocationManager {
       allocator.assertOpen();
 
       final int outcome;
-      try (AutoCloseableLock write = writeLock.open()) {
+      synchronized (this) {
         outcome = bufRefCnt.addAndGet(-decrement);
         if (outcome == 0) {
           lDestructionTime = System.nanoTime();
@@ -431,7 +411,7 @@ public class AllocationManager {
      * @return Amount of accounted(owned) memory associated with this ledger.
      */
     public int getAccountedSize() {
-      try (AutoCloseableLock read = readLock.open()) {
+      synchronized (this) {
         if (owningLedger == this) {
           return size;
         } else {
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java
index bc0b77a..f69a9d1 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java
@@ -270,7 +270,7 @@ public abstract class BaseFixedWidthVector extends BaseValueVector
     long curAllocationSizeValue = valueAllocationSizeInBytes;
     long curAllocationSizeValidity = validityAllocationSizeInBytes;
 
-    if (curAllocationSizeValue > MAX_ALLOCATION_SIZE) {
+    if (align(curAllocationSizeValue) + curAllocationSizeValidity > MAX_ALLOCATION_SIZE) {
       throw new OversizedAllocationException("Requested amount of memory exceeds limit");
     }
 
@@ -302,7 +302,7 @@ public abstract class BaseFixedWidthVector extends BaseValueVector
       valueBufferSize = validityBufferSize;
     }
 
-    if (valueBufferSize > MAX_ALLOCATION_SIZE) {
+    if (align(valueBufferSize) + validityBufferSize > MAX_ALLOCATION_SIZE) {
       throw new OversizedAllocationException("Requested amount of memory is more than max allowed");
     }
 
@@ -317,6 +317,13 @@ public abstract class BaseFixedWidthVector extends BaseValueVector
     }
   }
 
+  /*
+   * align to a 8-byte value.
+   */
+  private long align(long size) {
+    return ((size + 7) / 8) * 8;
+  }
+
   /**
    * Actual memory allocation is done by this function. All the calculations
    * and knowledge about what size to allocate is upto the callers of this
@@ -327,14 +334,24 @@ public abstract class BaseFixedWidthVector extends BaseValueVector
    * conditions.
    */
   private void allocateBytes(final long valueBufferSize, final long validityBufferSize) {
-    /* allocate data buffer */
-    int curSize = (int) valueBufferSize;
-    valueBuffer = allocator.buffer(curSize);
+    int valueBufferSlice = (int)align(valueBufferSize);
+    int validityBufferSlice = (int)validityBufferSize;
+
+    /* allocate combined buffer */
+    ArrowBuf buffer = allocator.buffer(valueBufferSlice + validityBufferSlice);
+
+    valueAllocationSizeInBytes = valueBufferSlice;
+    valueBuffer = buffer.slice(0, valueBufferSlice);
+    valueBuffer.retain();
     valueBuffer.readerIndex(0);
-    valueAllocationSizeInBytes = curSize;
-    /* allocate validity buffer */
-    allocateValidityBuffer((int) validityBufferSize);
+
+    validityAllocationSizeInBytes = validityBufferSlice;
+    validityBuffer = buffer.slice(valueBufferSlice, validityBufferSlice);
+    validityBuffer.retain();
+    validityBuffer.readerIndex(0);
     zeroVector();
+
+    buffer.release();
   }
 
   /**
@@ -422,43 +439,50 @@ public abstract class BaseFixedWidthVector extends BaseValueVector
    */
   @Override
   public void reAlloc() {
-    valueBuffer = reallocBufferHelper(valueBuffer, true);
-    validityBuffer = reallocBufferHelper(validityBuffer, false);
-  }
-
-  /**
-   * Helper method for reallocating a particular internal buffer
-   * Returns the new buffer.
-   */
-  private ArrowBuf reallocBufferHelper(ArrowBuf buffer, final boolean dataBuffer) {
-    final int currentBufferCapacity = buffer.capacity();
-    long baseSize = (dataBuffer ? valueAllocationSizeInBytes
-            : validityAllocationSizeInBytes);
+    int valueBaseSize = Integer.max(valueBuffer.capacity(), valueAllocationSizeInBytes);
+    long newValueBufferSlice = align(valueBaseSize * 2L);
+    long newValidityBufferSlice;
+    if (typeWidth > 0) {
+      long targetValueBufferSize = align(BaseAllocator.nextPowerOfTwo(newValueBufferSlice));
+      long targetValueCount = targetValueBufferSize / typeWidth;
+      targetValueBufferSize -= getValidityBufferSizeFromCount((int) targetValueCount);
+      if (newValueBufferSlice < targetValueBufferSize) {
+        newValueBufferSlice = targetValueBufferSize;
+      }
 
-    if (baseSize < (long) currentBufferCapacity) {
-      baseSize = (long) currentBufferCapacity;
+      newValidityBufferSlice = getValidityBufferSizeFromCount((int)(newValueBufferSlice / typeWidth));
+    } else {
+      newValidityBufferSlice = newValueBufferSlice;
     }
 
-    long newAllocationSize = baseSize * 2L;
-    newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize);
+    long newAllocationSize = newValueBufferSlice + newValidityBufferSlice;
     assert newAllocationSize >= 1;
 
     if (newAllocationSize > MAX_ALLOCATION_SIZE) {
       throw new OversizedAllocationException("Unable to expand the buffer");
     }
 
-    final ArrowBuf newBuf = allocator.buffer((int) newAllocationSize);
-    newBuf.setBytes(0, buffer, 0, currentBufferCapacity);
-    newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity);
-    buffer.release(1);
-    buffer = newBuf;
-    if (dataBuffer) {
-      valueAllocationSizeInBytes = (int) newAllocationSize;
-    } else {
-      validityAllocationSizeInBytes = (int) newAllocationSize;
-    }
+    final ArrowBuf newBuffer = allocator.buffer((int) newAllocationSize);
+    final ArrowBuf newValueBuffer = newBuffer.slice(0, (int)newValueBufferSlice);
+    newValueBuffer.setBytes(0, valueBuffer, 0, valueBuffer.capacity());
+    newValueBuffer.setZero(valueBuffer.capacity(), (int)newValueBufferSlice - valueBuffer.capacity());
+    newValueBuffer.retain();
+    newValueBuffer.readerIndex(0);
+    valueBuffer.release();
+    valueBuffer = newValueBuffer;
+    valueAllocationSizeInBytes = (int)newValueBufferSlice;
+
+    final ArrowBuf newValidityBuffer = newBuffer.slice((int)newValueBufferSlice,
+        (int)newValidityBufferSlice);
+    newValidityBuffer.setBytes(0, validityBuffer, 0, validityBuffer.capacity());
+    newValidityBuffer.setZero(validityBuffer.capacity(), (int)newValidityBufferSlice - validityBuffer.capacity());
+    newValidityBuffer.retain();
+    newValidityBuffer.readerIndex(0);
+    validityBuffer.release();
+    validityBuffer = newValidityBuffer;
+    validityAllocationSizeInBytes = (int)newValidityBufferSlice;
 
-    return buffer;
+    newBuffer.release();
   }
 
   @Override
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestBufferOwnershipTransfer.java b/java/vector/src/test/java/org/apache/arrow/vector/TestBufferOwnershipTransfer.java
index 48bc893..9165343 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestBufferOwnershipTransfer.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestBufferOwnershipTransfer.java
@@ -40,15 +40,14 @@ public class TestBufferOwnershipTransfer {
     IntVector v1 = new IntVector("v1", childAllocator1);
     v1.allocateNew();
     v1.setValueCount(4095);
+    long totalAllocatedMemory = childAllocator1.getAllocatedMemory();
 
     IntVector v2 = new IntVector("v2", childAllocator2);
 
     v1.makeTransferPair(v2).transfer();
 
     assertEquals(0, childAllocator1.getAllocatedMemory());
-    int expectedBitVector = 512;
-    int expectedValueVector = 4096 * 4;
-    assertEquals(expectedBitVector + expectedValueVector, childAllocator2.getAllocatedMemory());
+    assertEquals(totalAllocatedMemory, childAllocator2.getAllocatedMemory());
   }
 
   @Test
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java
index 4e8d8f0..68102b1 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java
@@ -774,13 +774,13 @@ public class TestListVector {
       vector.setInitialCapacity(512);
       vector.allocateNew();
       assertEquals(512, vector.getValueCapacity());
-      assertEquals(4096, vector.getDataVector().getValueCapacity());
+      assertTrue(vector.getDataVector().getValueCapacity() >= 512 * 5);
 
       /* use density as 4 */
       vector.setInitialCapacity(512, 4);
       vector.allocateNew();
       assertEquals(512, vector.getValueCapacity());
-      assertEquals(512 * 4, vector.getDataVector().getValueCapacity());
+      assertTrue(vector.getDataVector().getValueCapacity() >= 512 * 4);
 
       /**
        * inner value capacity we pass to data vector is 512 * 0.1 => 51
@@ -793,7 +793,7 @@ public class TestListVector {
       vector.setInitialCapacity(512, 0.1);
       vector.allocateNew();
       assertEquals(512, vector.getValueCapacity());
-      assertEquals(64, vector.getDataVector().getValueCapacity());
+      assertTrue(vector.getDataVector().getValueCapacity() >= 51);
 
       /**
        * inner value capacity we pass to data vector is 512 * 0.01 => 5
@@ -806,7 +806,7 @@ public class TestListVector {
       vector.setInitialCapacity(512, 0.01);
       vector.allocateNew();
       assertEquals(512, vector.getValueCapacity());
-      assertEquals(8, vector.getDataVector().getValueCapacity());
+      assertTrue(vector.getDataVector().getValueCapacity() >= 5);
 
       /**
        * inner value capacity we pass to data vector is 5 * 0.1 => 0
@@ -822,7 +822,7 @@ public class TestListVector {
       vector.setInitialCapacity(5, 0.1);
       vector.allocateNew();
       assertEquals(7, vector.getValueCapacity());
-      assertEquals(1, vector.getDataVector().getValueCapacity());
+      assertTrue(vector.getDataVector().getValueCapacity() >= 1);
     }
   }