You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by em...@apache.org on 2019/05/31 03:36:24 UTC

[arrow] branch master updated: ARROW-5429: [Java] Provide alternative buffer allocation policy

This is an automated email from the ASF dual-hosted git repository.

emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 978a167  ARROW-5429: [Java] Provide alternative buffer allocation policy
978a167 is described below

commit 978a1672e4e4f9913685aea4661ba812071412cd
Author: liyafan82 <fa...@foxmail.com>
AuthorDate: Thu May 30 20:35:42 2019 -0700

    ARROW-5429: [Java] Provide alternative buffer allocation policy
    
    The current buffer allocation policy works like this:
    
    1. If the requested buffer size is greater than or equal to the chunk size, the buffer size will be as is.
    2. If the requested size is within the chunk size, the buffer size will be rounded to the next power of 2.
    
    This policy can lead to waste of memory in some cases. For example, if we request a buffer of size 10MB, Arrow will round the buffer size to 16 MB. If we only need 10 MB, this will lead to a waste of (16 - 10) / 10 = 60% of memory.
    
    So in this proposal, we provide another policy: the rounded buffer size must be a multiple of some memory unit, like (32 KB). This policy has two benefits:
    
    1. The wasted memory cannot exceed one memory unit (32 KB), which is much smaller than the power-of-two policy.
    2. This is the memory allocation policy adopted by some computation engines (e.g. Apache Flink).
    
    Author: liyafan82 <fa...@foxmail.com>
    
    Closes #4400 from liyafan82/fly_5429 and squashes the following commits:
    
    6b44dce2 <liyafan82> Merge branch 'master' into fly_5429
    ed795105 <liyafan82> Resolve comments and evaluate performance
    578d5fde <liyafan82> Resolve comments
    a6840e15 <liyafan82> Remove useless import
    36bbeb14 <liyafan82> Provide alternative buffer allocation policy
---
 .../org/apache/arrow/memory/BaseAllocator.java     | 16 +++---
 .../org/apache/arrow/memory/ChildAllocator.java    | 16 +++---
 .../org/apache/arrow/memory/RootAllocator.java     |  8 ++-
 .../memory/rounding/DefaultRoundingPolicy.java     | 54 +++++++++++++++++++
 .../RoundingPolicy.java}                           | 32 ++----------
 .../memory/rounding/SegmentRoundingPolicy.java     | 60 +++++++++++++++++++++
 .../org/apache/arrow/memory/TestBaseAllocator.java | 61 ++++++++++++++++++++++
 7 files changed, 205 insertions(+), 42 deletions(-)

diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
index 2cf1e5b..c1e7a12 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
@@ -25,6 +25,7 @@ import java.util.IdentityHashMap;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.arrow.memory.rounding.RoundingPolicy;
 import org.apache.arrow.memory.util.AssertionUtil;
 import org.apache.arrow.memory.util.HistoricalLog;
 import org.apache.arrow.util.Preconditions;
@@ -60,6 +61,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
   private final IdentityHashMap<Reservation, Object> reservations;
   private final HistoricalLog historicalLog;
   private volatile boolean isClosed = false; // the allocator has been closed
+  private final RoundingPolicy roundingPolicy;
 
   /**
    * Initialize an allocator
@@ -76,7 +78,8 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
       final AllocationListener listener,
       final String name,
       final long initReservation,
-      final long maxAllocation) throws OutOfMemoryException {
+      final long maxAllocation,
+      final RoundingPolicy roundingPolicy) throws OutOfMemoryException {
     super(parentAllocator, name, initReservation, maxAllocation);
 
     this.listener = listener;
@@ -108,7 +111,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
       historicalLog = null;
       childLedgers = null;
     }
-
+    this.roundingPolicy = roundingPolicy;
   }
 
   AllocationListener getListener() {
@@ -288,11 +291,8 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
       return empty;
     }
 
-    // round to next largest power of two if we're within a chunk since that is how our allocator
-    // operates
-    final int actualRequestSize =
-        initialRequestSize < AllocationManager.CHUNK_SIZE ?
-          nextPowerOfTwo(initialRequestSize) : initialRequestSize;
+    // round the request size according to the rounding policy
+    final int actualRequestSize = roundingPolicy.getRoundedSize(initialRequestSize);
 
     listener.onPreAllocation(actualRequestSize);
 
@@ -375,7 +375,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
     assertOpen();
 
     final ChildAllocator childAllocator =
-        new ChildAllocator(listener, this, name, initReservation, maxAllocation);
+        new ChildAllocator(listener, this, name, initReservation, maxAllocation, roundingPolicy);
 
     if (DEBUG) {
       synchronized (DEBUG_LOCK) {
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
index 98c316c..d6bac3d 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
@@ -18,6 +18,8 @@
 package org.apache.arrow.memory;
 
 
+import org.apache.arrow.memory.rounding.RoundingPolicy;
+
 /**
  * Child allocator class. Only slightly different from the {@see RootAllocator},
  * in that these can't be created directly, but must be obtained from
@@ -39,14 +41,16 @@ class ChildAllocator extends BaseAllocator {
    *                        this includes direct allocations (via {@see BufferAllocator#buffer(int,
    *int)} et al) and requests from descendant allocators. Depending on the
    *                        allocation policy in force, even less memory may be available
+   * @param roundingPolicy the policy for rounding requested buffer size
    */
   ChildAllocator(
-      AllocationListener listener,
-      BaseAllocator parentAllocator,
-      String name,
-      long initReservation,
-      long maxAllocation) {
-    super(parentAllocator, listener, name, initReservation, maxAllocation);
+          AllocationListener listener,
+          BaseAllocator parentAllocator,
+          String name,
+          long initReservation,
+          long maxAllocation,
+          RoundingPolicy roundingPolicy) {
+    super(parentAllocator, listener, name, initReservation, maxAllocation, roundingPolicy);
   }
 
 
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
index 3023a14..02e27ab 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
@@ -17,6 +17,8 @@
 
 package org.apache.arrow.memory;
 
+import org.apache.arrow.memory.rounding.DefaultRoundingPolicy;
+import org.apache.arrow.memory.rounding.RoundingPolicy;
 import org.apache.arrow.util.VisibleForTesting;
 
 /**
@@ -35,7 +37,11 @@ public class RootAllocator extends BaseAllocator {
   }
 
   public RootAllocator(final AllocationListener listener, final long limit) {
-    super(null, listener, "ROOT", 0, limit);
+    this(listener, limit, DefaultRoundingPolicy.INSTANCE);
+  }
+
+  public RootAllocator(final AllocationListener listener, final long limit, RoundingPolicy roundingPolicy) {
+    super(null, listener, "ROOT", 0, limit, roundingPolicy);
   }
 
   /**
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/rounding/DefaultRoundingPolicy.java b/java/memory/src/main/java/org/apache/arrow/memory/rounding/DefaultRoundingPolicy.java
new file mode 100644
index 0000000..7bc8393
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/rounding/DefaultRoundingPolicy.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory.rounding;
+
+import java.lang.reflect.Field;
+
+import org.apache.arrow.memory.AllocationManager;
+import org.apache.arrow.memory.BaseAllocator;
+
+/**
+ * The default rounding policy. That is, if the requested size is within the chunk size,
+ * the rounded size will be the next power of two. Otherwise, the rounded size
+ * will be identical to the requested size.
+ */
+public class DefaultRoundingPolicy implements RoundingPolicy {
+
+  public final long chunkSize;
+
+  /**
+   * The singleton instance.
+   */
+  public static final DefaultRoundingPolicy INSTANCE = new DefaultRoundingPolicy();
+
+  private DefaultRoundingPolicy() {
+    try {
+      Field field = AllocationManager.class.getDeclaredField("CHUNK_SIZE");
+      field.setAccessible(true);
+      chunkSize = (Long) field.get(null);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to get chunk size from allocation manager");
+    }
+  }
+
+  @Override
+  public int getRoundedSize(int requestSize) {
+    return requestSize < chunkSize ?
+            BaseAllocator.nextPowerOfTwo(requestSize) : requestSize;
+  }
+}
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/rounding/RoundingPolicy.java
similarity index 51%
copy from java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
copy to java/memory/src/main/java/org/apache/arrow/memory/rounding/RoundingPolicy.java
index 3023a14..f3e0e29 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/rounding/RoundingPolicy.java
@@ -15,34 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.memory;
-
-import org.apache.arrow.util.VisibleForTesting;
+package org.apache.arrow.memory.rounding;
 
 /**
- * A root allocator for using direct memory for Arrow Vectors/Arrays. Supports creating a
- * tree of descendant child allocators to facilitate better instrumentation of memory
- * allocations.
+ * The policy for rounding the buffer size, to improve performance and avoid memory fragmentation.
+ * In particular, given a requested buffer size, the policy will determine the rounded buffer size.
  */
-public class RootAllocator extends BaseAllocator {
-
-  public RootAllocator() {
-    this(AllocationListener.NOOP, Long.MAX_VALUE);
-  }
-
-  public RootAllocator(final long limit) {
-    this(AllocationListener.NOOP, limit);
-  }
-
-  public RootAllocator(final AllocationListener listener, final long limit) {
-    super(null, listener, "ROOT", 0, limit);
-  }
-
-  /**
-   * Verify the accounting state of the allocation system.
-   */
-  @VisibleForTesting
-  public void verify() {
-    verifyAllocator();
-  }
+public interface RoundingPolicy {
+  int getRoundedSize(int requestSize);
 }
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/rounding/SegmentRoundingPolicy.java b/java/memory/src/main/java/org/apache/arrow/memory/rounding/SegmentRoundingPolicy.java
new file mode 100644
index 0000000..52e3641
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/rounding/SegmentRoundingPolicy.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory.rounding;
+
+import org.apache.arrow.util.Preconditions;
+
+/**
+ * The rounding policy that each buffer size must a multiple of the segment size.
+ */
+public class SegmentRoundingPolicy implements  RoundingPolicy {
+
+  /**
+   * The minimal segment size.
+   */
+  public static final long MIN_SEGMENT_SIZE = 1024L;
+
+  /**
+   * The segment size. It must be at least {@link SegmentRoundingPolicy#MIN_SEGMENT_SIZE},
+   * and be a power of 2.
+   */
+  private int segmentSize;
+
+  /**
+   * Constructor for the segment rounding policy.
+   * @param segmentSize the segment size.
+   * @throws IllegalArgumentException if the segment size is smaller than
+   * {@link SegmentRoundingPolicy#MIN_SEGMENT_SIZE}, or is not a power of 2.
+   */
+  public SegmentRoundingPolicy(int segmentSize) {
+    Preconditions.checkArgument(segmentSize >= MIN_SEGMENT_SIZE,
+            "The segment size cannot be smaller than " + MIN_SEGMENT_SIZE);
+    Preconditions.checkArgument((segmentSize & (segmentSize - 1)) == 0,
+            "The segment size must be a power of 2");
+    this.segmentSize = segmentSize;
+  }
+
+  @Override
+  public int getRoundedSize(int requestSize) {
+    return (requestSize + (segmentSize - 1)) / segmentSize * segmentSize;
+  }
+
+  public int getSegmentSize() {
+    return segmentSize;
+  }
+}
diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
index 823187b..38e1298 100644
--- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
+++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
@@ -31,8 +31,11 @@ import java.util.Collections;
 import java.util.Iterator;
 
 import org.apache.arrow.memory.AllocationOutcomeDetails.Entry;
+import org.apache.arrow.memory.rounding.RoundingPolicy;
+import org.apache.arrow.memory.rounding.SegmentRoundingPolicy;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 
 import io.netty.buffer.ArrowBuf;
 
@@ -297,6 +300,64 @@ public class TestBaseAllocator {
     }
   }
 
+  @Test
+  public void testSegmentAllocator() {
+    RoundingPolicy policy = new SegmentRoundingPolicy(1024);
+    try (RootAllocator allocator = new RootAllocator(AllocationListener.NOOP, 1024 * 1024, policy)) {
+      ArrowBuf buf = allocator.buffer(798);
+      assertEquals(1024, buf.capacity());
+      buf.setInt(333, 959);
+      assertEquals(959, buf.getInt(333));
+      buf.close();
+
+      buf = allocator.buffer(1025);
+      assertEquals(2048, buf.capacity());
+      buf.setInt(193, 939);
+      assertEquals(939, buf.getInt(193));
+      buf.close();
+    }
+  }
+
+  @Test
+  public void testSegmentAllocator_childAllocator() {
+    RoundingPolicy policy = new SegmentRoundingPolicy(1024);
+    try (RootAllocator allocator = new RootAllocator(AllocationListener.NOOP, 1024 * 1024, policy);
+      BufferAllocator childAllocator = allocator.newChildAllocator("child", 0, 512 * 1024)) {
+
+      assertEquals("child", childAllocator.getName());
+
+      ArrowBuf buf = childAllocator.buffer(798);
+      assertEquals(1024, buf.capacity());
+      buf.setInt(333, 959);
+      assertEquals(959, buf.getInt(333));
+      buf.close();
+
+      buf = childAllocator.buffer(1025);
+      assertEquals(2048, buf.capacity());
+      buf.setInt(193, 939);
+      assertEquals(939, buf.getInt(193));
+      buf.close();
+    }
+  }
+
+  @Test
+  public void testSegmentAllocator_smallSegment() {
+    IllegalArgumentException e = Assertions.assertThrows(
+            IllegalArgumentException.class,
+        () -> new SegmentRoundingPolicy(128)
+    );
+    assertEquals("The segment size cannot be smaller than 1024", e.getMessage());
+  }
+
+  @Test
+  public void testSegmentAllocator_segmentSizeNotPowerOf2() {
+    IllegalArgumentException e = Assertions.assertThrows(
+            IllegalArgumentException.class,
+        () -> new SegmentRoundingPolicy(4097)
+    );
+    assertEquals("The segment size must be a power of 2", e.getMessage());
+  }
+
   // Allocation listener
   // It counts the number of times it has been invoked, and how much memory allocation it has seen
   // When set to 'expand on fail', it attempts to expand the associated allocator's limit