You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ra...@apache.org on 2019/04/03 05:30:46 UTC

[arrow] branch master updated: ARROW-4913:[Java][Memory] Add additional methods for observing allocations.

This is an automated email from the ASF dual-hosted git repository.

ravindra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 34ff4b4  ARROW-4913:[Java][Memory] Add additional methods for observing allocations.
34ff4b4 is described below

commit 34ff4b4d35624d0005a953adf751e4e76537d66b
Author: Praveen <pr...@dremio.com>
AuthorDate: Wed Apr 3 11:00:22 2019 +0530

    ARROW-4913:[Java][Memory] Add additional methods for observing allocations.
    
    - Additional methods for observers to calculate total number of live ledgers and buffers.
    - Allows clients to clamp down on heap growth due to these buffers.
    
    Author: Praveen <pr...@dremio.com>
    Author: Jacques Nadeau <ja...@apache.org>
    
    Closes #4012 from praveenbingo/ARROW-4913 and squashes the following commits:
    
    4671d032 <Praveen> Fix some checkstyle failures.
    5af55057 <Jacques Nadeau> ARROW-TBD: Add additional methods to the allocation listener interface for PreAllocation and Release.
---
 .../apache/arrow/memory/AllocationListener.java    | 49 ++++++++++++----------
 .../org/apache/arrow/memory/AllocationManager.java |  1 +
 .../org/apache/arrow/memory/BaseAllocator.java     |  5 ++-
 .../org/apache/arrow/memory/RootAllocator.java     |  4 ++
 .../org/apache/arrow/memory/TestBaseAllocator.java | 39 ++++++++++++++++-
 5 files changed, 73 insertions(+), 25 deletions(-)

diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
index 4fd5330..fba6a70 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
@@ -25,31 +25,34 @@ package org.apache.arrow.memory;
  */
 public interface AllocationListener {
 
-  public static final AllocationListener NOOP = new AllocationListener() {
-    @Override
-    public void onAllocation(long size) {
-    }
+  public static final AllocationListener NOOP = new AllocationListener() {};
 
-    @Override
-    public boolean onFailedAllocation(long size, AllocationOutcome outcome) {
-      return false;
-    }
-
-    @Override
-    public void onChildAdded(BufferAllocator parentAllocator, BufferAllocator childAllocator) {
-    }
-
-    @Override
-    public void onChildRemoved(BufferAllocator parentAllocator, BufferAllocator childAllocator) {
-    }
-  };
+  /**
+   * Called each time a new buffer has been requested.
+   *
+   * <p>An exception can be safely thrown by this method to terminate the allocation.
+   *
+   * @param size the buffer size being allocated
+   */
+  default void onPreAllocation(long size) {}
 
   /**
-   * Called each time a new buffer is allocated.
+   * Called each time a new buffer has been allocated.
+   *
+   * <p>An exception cannot be thrown by this method.
    *
    * @param size the buffer size being allocated
    */
-  void onAllocation(long size);
+  default void onAllocation(long size) {}
+
+  /**
+   * Informed each time a buffer is released from allocation.
+   *
+   * <p>An exception cannot be thrown by this method.
+   * @param size The size of the buffer being released.
+   */
+  default void onRelease(long size) {}
+
 
   /**
    * Called whenever an allocation failed, giving the caller a chance to create some space in the
@@ -60,7 +63,9 @@ public interface AllocationListener {
    * @param outcome  the outcome of the failed allocation. Carries information of what failed
    * @return true, if the allocation can be retried; false if the allocation should fail
    */
-  boolean onFailedAllocation(long size, AllocationOutcome outcome);
+  default boolean onFailedAllocation(long size, AllocationOutcome outcome) {
+    return false;
+  }
 
   /**
    * Called immediately after a child allocator was added to the parent allocator.
@@ -68,7 +73,7 @@ public interface AllocationListener {
    * @param parentAllocator The parent allocator to which a child was added
    * @param childAllocator  The child allocator that was just added
    */
-  void onChildAdded(BufferAllocator parentAllocator, BufferAllocator childAllocator);
+  default void onChildAdded(BufferAllocator parentAllocator, BufferAllocator childAllocator) {}
 
   /**
    * Called immediately after a child allocator was removed from the parent allocator.
@@ -76,5 +81,5 @@ public interface AllocationListener {
    * @param parentAllocator The parent allocator from which a child was removed
    * @param childAllocator The child allocator that was just removed
    */
-  void onChildRemoved(BufferAllocator parentAllocator, BufferAllocator childAllocator);
+  default void onChildRemoved(BufferAllocator parentAllocator, BufferAllocator childAllocator) {}
 }
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
index c10d246..3a8d465 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
@@ -147,6 +147,7 @@ public class AllocationManager {
         // no one else owns, lets release.
         oldLedger.allocator.releaseBytes(size);
         underlying.release();
+        oldLedger.allocator.listener.onRelease(size);
         amDestructionTime = System.nanoTime();
         owningLedger = null;
       } else {
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
index 34613b4..bd84aef 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
@@ -49,7 +49,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
   final String name;
   final RootAllocator root;
   private final Object DEBUG_LOCK = DEBUG ? new Object() : null;
-  private final AllocationListener listener;
+  final AllocationListener listener;
   private final BaseAllocator parentAllocator;
   private final ArrowByteBufAllocator thisAsByteBufAllocator;
   private final IdentityHashMap<BaseAllocator, Object> childAllocators;
@@ -277,6 +277,9 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
     final int actualRequestSize = initialRequestSize < AllocationManager.CHUNK_SIZE ?
         nextPowerOfTwo(initialRequestSize)
         : initialRequestSize;
+
+    listener.onPreAllocation(actualRequestSize);
+
     AllocationOutcome outcome = this.allocateBytes(actualRequestSize);
     if (!outcome.isOk()) {
       if (listener.onFailedAllocation(actualRequestSize, outcome)) {
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
index b6fefd7..3023a14 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
@@ -26,6 +26,10 @@ import org.apache.arrow.util.VisibleForTesting;
  */
 public class RootAllocator extends BaseAllocator {
 
+  public RootAllocator() {
+    this(AllocationListener.NOOP, Long.MAX_VALUE);
+  }
+
   public RootAllocator(final long limit) {
     this(AllocationListener.NOOP, limit);
   }
diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
index c53eb06..4da7434 100644
--- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
+++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
@@ -228,7 +228,9 @@ public class TestBaseAllocator {
   // It counts the number of times it has been invoked, and how much memory allocation it has seen
   // When set to 'expand on fail', it attempts to expand the associated allocator's limit
   private static final class TestAllocationListener implements AllocationListener {
+    private int numPreCalls;
     private int numCalls;
+    private int numReleaseCalls;
     private int numChildren;
     private long totalMem;
     private boolean expandOnFail;
@@ -245,6 +247,11 @@ public class TestBaseAllocator {
     }
 
     @Override
+    public void onPreAllocation(long size) {
+      numPreCalls++;
+    }
+
+    @Override
     public void onAllocation(long size) {
       numCalls++;
       totalMem += size;
@@ -259,6 +266,12 @@ public class TestBaseAllocator {
       return false;
     }
 
+
+    @Override
+    public void onRelease(long size) {
+      numReleaseCalls++;
+    }
+
     @Override
     public void onChildAdded(BufferAllocator parentAllocator, BufferAllocator childAllocator) {
       ++numChildren;
@@ -275,6 +288,14 @@ public class TestBaseAllocator {
       this.expandLimit = expandLimit;
     }
 
+    int getNumPreCalls() {
+      return numPreCalls;
+    }
+
+    int getNumReleaseCalls() {
+      return numReleaseCalls;
+    }
+
     int getNumCalls() {
       return numCalls;
     }
@@ -291,11 +312,15 @@ public class TestBaseAllocator {
   @Test
   public void testRootAllocator_listeners() throws Exception {
     TestAllocationListener l1 = new TestAllocationListener();
+    assertEquals(0, l1.getNumPreCalls());
     assertEquals(0, l1.getNumCalls());
+    assertEquals(0, l1.getNumReleaseCalls());
     assertEquals(0, l1.getNumChildren());
     assertEquals(0, l1.getTotalMem());
     TestAllocationListener l2 = new TestAllocationListener();
+    assertEquals(0, l2.getNumPreCalls());
     assertEquals(0, l2.getNumCalls());
+    assertEquals(0, l2.getNumReleaseCalls());
     assertEquals(0, l2.getNumChildren());
     assertEquals(0, l2.getTotalMem());
     // root and first-level child share the first listener
@@ -305,7 +330,9 @@ public class TestBaseAllocator {
         assertEquals(1, l1.getNumChildren());
         final ArrowBuf buf1 = c1.buffer(16);
         assertNotNull("allocation failed", buf1);
+        assertEquals(1, l1.getNumPreCalls());
         assertEquals(1, l1.getNumCalls());
+        assertEquals(0, l1.getNumReleaseCalls());
         assertEquals(16, l1.getTotalMem());
         buf1.release();
         try (final BufferAllocator c2 = c1.newChildAllocator("c2", l2, 0, MAX_ALLOCATION)) {
@@ -315,7 +342,9 @@ public class TestBaseAllocator {
           assertNotNull("allocation failed", buf2);
           assertEquals(1, l1.getNumCalls());
           assertEquals(16, l1.getTotalMem());
+          assertEquals(1, l2.getNumPreCalls());
           assertEquals(1, l2.getNumCalls());
+          assertEquals(0, l2.getNumReleaseCalls());
           assertEquals(32, l2.getTotalMem());
           buf2.release();
           try (final BufferAllocator c3 = c2.newChildAllocator("c3", 0, MAX_ALLOCATION)) {
@@ -323,9 +352,13 @@ public class TestBaseAllocator {
             assertEquals(1, l2.getNumChildren());
             final ArrowBuf buf3 = c3.buffer(64);
             assertNotNull("allocation failed", buf3);
+            assertEquals(1, l1.getNumPreCalls());
             assertEquals(1, l1.getNumCalls());
+            assertEquals(1, l1.getNumReleaseCalls());
             assertEquals(16, l1.getTotalMem());
+            assertEquals(2, l2.getNumPreCalls());
             assertEquals(2, l2.getNumCalls());
+            assertEquals(1, l2.getNumReleaseCalls());
             assertEquals(32 + 64, l2.getTotalMem());
             buf3.release();
           }
@@ -336,6 +369,8 @@ public class TestBaseAllocator {
         assertEquals(0, l2.getNumChildren());
       }
       assertEquals(0, l1.getNumChildren()); // first-level child removed
+
+      assertEquals(2, l2.getNumReleaseCalls());
     }
   }
 
@@ -505,7 +540,7 @@ public class TestBaseAllocator {
       assertEquals(0, arrowBuf.writerIndex());
       assertEquals(256, arrowBuf.writableBytes());
 
-      final ArrowBuf slice3 = (ArrowBuf) arrowBuf.slice();
+      final ArrowBuf slice3 = arrowBuf.slice();
       assertEquals(0, slice3.readerIndex());
       assertEquals(0, slice3.readableBytes());
       assertEquals(0, slice3.writerIndex());
@@ -520,7 +555,7 @@ public class TestBaseAllocator {
       assertEquals(256, arrowBuf.writerIndex());
       assertEquals(0, arrowBuf.writableBytes());
 
-      final ArrowBuf slice1 = (ArrowBuf) arrowBuf.slice();
+      final ArrowBuf slice1 = arrowBuf.slice();
       assertEquals(0, slice1.readerIndex());
       assertEquals(256, slice1.readableBytes());
       for (int i = 0; i < 10; ++i) {