You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/12 17:21:45 UTC

arrow git commit: ARROW-385: Refactors metric system

Repository: arrow
Updated Branches:
  refs/heads/master 7d3e2a3ab -> c5663c6d0


ARROW-385: Refactors metric system

Arrow has some support for metrics, but the metrics registry is by default
not configured to export values. It also forces user to user yammer/codahale
metrics library instead of the library of their choice.

To allow for integration with other metrics system, replace it with a notification
mechanism to alert user on allocation/deallocation.

Author: Laurent Goujon <la...@dremio.com>

Closes #212 from laurentgo/laurent/metrics-refactoring and squashes the following commits:

e6c435b [Laurent Goujon] ARROW-385: Refactors metric system


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/c5663c6d
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/c5663c6d
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/c5663c6d

Branch: refs/heads/master
Commit: c5663c6d00dbd297dac573670156e26dc0593357
Parents: 7d3e2a3
Author: Laurent Goujon <la...@dremio.com>
Authored: Thu Jan 12 12:21:37 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Thu Jan 12 12:21:37 2017 -0500

----------------------------------------------------------------------
 java/memory/pom.xml                             |   7 -
 .../main/java/io/netty/buffer/LargeBuffer.java  |  31 +---
 .../netty/buffer/PooledByteBufAllocatorL.java   | 157 +++++++++----------
 .../netty/buffer/UnsafeDirectLittleEndian.java  |  34 +---
 .../apache/arrow/memory/AllocationListener.java |  40 +++++
 .../apache/arrow/memory/AllocationManager.java  |  13 +-
 .../org/apache/arrow/memory/BaseAllocator.java  |  30 +++-
 .../org/apache/arrow/memory/RootAllocator.java  |   7 +-
 .../org/apache/arrow/memory/util/Metrics.java   |  40 -----
 .../org/apache/arrow/memory/util/Pointer.java   |  28 ----
 10 files changed, 158 insertions(+), 229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/pom.xml
----------------------------------------------------------------------
diff --git a/java/memory/pom.xml b/java/memory/pom.xml
index 6ed1448..a4eb652 100644
--- a/java/memory/pom.xml
+++ b/java/memory/pom.xml
@@ -20,13 +20,6 @@
   <name>Arrow Memory</name>
 
   <dependencies>
-
-    <dependency>
-      <groupId>com.codahale.metrics</groupId>
-      <artifactId>metrics-core</artifactId>
-      <version>3.0.1</version>
-    </dependency>
-
     <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>

http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java b/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java
index 5f5e904..c026e43 100644
--- a/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java
+++ b/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java
@@ -17,43 +17,16 @@
  */
 package io.netty.buffer;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and counts.
  */
 public class LargeBuffer extends MutableWrappedByteBuf {
-
-  private final AtomicLong hugeBufferSize;
-  private final AtomicLong hugeBufferCount;
-
-  private final int initCap;
-
-  public LargeBuffer(ByteBuf buffer, AtomicLong hugeBufferSize, AtomicLong hugeBufferCount) {
+  public LargeBuffer(ByteBuf buffer) {
     super(buffer);
-    initCap = buffer.capacity();
-    this.hugeBufferCount = hugeBufferCount;
-    this.hugeBufferSize = hugeBufferSize;
   }
 
   @Override
   public ByteBuf copy(int index, int length) {
-    return new LargeBuffer(buffer.copy(index, length), hugeBufferSize, hugeBufferCount);
+    return new LargeBuffer(buffer.copy(index, length));
   }
-
-  @Override
-  public boolean release() {
-    return release(1);
-  }
-
-  @Override
-  public boolean release(int decrement) {
-    boolean released = unwrap().release(decrement);
-    if (released) {
-      hugeBufferSize.addAndGet(-initCap);
-      hugeBufferCount.decrementAndGet();
-    }
-    return released;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
index f6feb65..a843ac5 100644
--- a/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
+++ b/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
@@ -17,7 +17,7 @@
  */
 package io.netty.buffer;
 
-import io.netty.util.internal.StringUtil;
+import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED;
 
 import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
@@ -25,24 +25,16 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.arrow.memory.OutOfMemoryException;
 
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
+import io.netty.util.internal.StringUtil;
 
 /**
  * The base allocator that we use for all of Arrow's memory management. Returns UnsafeDirectLittleEndian buffers.
  */
 public class PooledByteBufAllocatorL {
-  private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("drill.allocator");
+  private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator");
 
   private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
 
-
-  public static final String METRIC_PREFIX = "drill.allocator.";
-
-  private final MetricRegistry registry;
   private final AtomicLong hugeBufferSize = new AtomicLong(0);
   private final AtomicLong hugeBufferCount = new AtomicLong(0);
   private final AtomicLong normalBufferSize = new AtomicLong(0);
@@ -51,8 +43,7 @@ public class PooledByteBufAllocatorL {
   private final InnerAllocator allocator;
   public final UnsafeDirectLittleEndian empty;
 
-  public PooledByteBufAllocatorL(MetricRegistry registry) {
-    this.registry = registry;
+  public PooledByteBufAllocatorL() {
     allocator = new InnerAllocator();
     empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER));
   }
@@ -70,13 +61,66 @@ public class PooledByteBufAllocatorL {
     return allocator.chunkSize;
   }
 
-  private class InnerAllocator extends PooledByteBufAllocator {
+  public long getHugeBufferSize() {
+    return hugeBufferSize.get();
+  }
 
+  public long getHugeBufferCount() {
+    return hugeBufferCount.get();
+  }
 
+  public long getNormalBufferSize() {
+    return normalBufferSize.get();
+  }
+
+  public long getNormalBufferCount() {
+    return normalBufferSize.get();
+  }
+
+  private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian {
+    private final long initialCapacity;
+    private final AtomicLong count;
+    private final AtomicLong size;
+
+    private AccountedUnsafeDirectLittleEndian(LargeBuffer buf, AtomicLong count, AtomicLong size) {
+      super(buf);
+      this.initialCapacity = buf.capacity();
+      this.count = count;
+      this.size = size;
+    }
+
+    private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count, AtomicLong size) {
+      super(buf);
+      this.initialCapacity = buf.capacity();
+      this.count = count;
+      this.size = size;
+    }
+
+    @Override
+    public ByteBuf copy() {
+      throw new UnsupportedOperationException("copy method is not supported");
+    }
+
+    @Override
+    public ByteBuf copy(int index, int length) {
+      throw new UnsupportedOperationException("copy method is not supported");
+    }
+
+    @Override
+    public boolean release(int decrement) {
+      boolean released = super.release(decrement);
+      if (released) {
+        count.decrementAndGet();
+        size.addAndGet(-initialCapacity);
+      }
+      return released;
+    }
+
+  }
+
+  private class InnerAllocator extends PooledByteBufAllocator {
     private final PoolArena<ByteBuffer>[] directArenas;
     private final MemoryStatusThread statusThread;
-    private final Histogram largeBuffersHist;
-    private final Histogram normalBuffersHist;
     private final int chunkSize;
 
     public InnerAllocator() {
@@ -98,50 +142,6 @@ public class PooledByteBufAllocatorL {
       } else {
         statusThread = null;
       }
-      removeOldMetrics();
-
-      registry.register(METRIC_PREFIX + "normal.size", new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return normalBufferSize.get();
-        }
-      });
-
-      registry.register(METRIC_PREFIX + "normal.count", new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return normalBufferCount.get();
-        }
-      });
-
-      registry.register(METRIC_PREFIX + "huge.size", new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return hugeBufferSize.get();
-        }
-      });
-
-      registry.register(METRIC_PREFIX + "huge.count", new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return hugeBufferCount.get();
-        }
-      });
-
-      largeBuffersHist = registry.histogram(METRIC_PREFIX + "huge.hist");
-      normalBuffersHist = registry.histogram(METRIC_PREFIX + "normal.hist");
-
-    }
-
-
-    private synchronized void removeOldMetrics() {
-      registry.removeMatching(new MetricFilter() {
-        @Override
-        public boolean matches(String name, Metric metric) {
-          return name.startsWith("drill.allocator.");
-        }
-
-      });
     }
 
     private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity) {
@@ -154,12 +154,11 @@ public class PooledByteBufAllocatorL {
           // This is beyond chunk size so we'll allocate separately.
           ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);
 
-          hugeBufferCount.incrementAndGet();
           hugeBufferSize.addAndGet(buf.capacity());
-          largeBuffersHist.update(buf.capacity());
-          // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
-          return new UnsafeDirectLittleEndian(new LargeBuffer(buf, hugeBufferSize, hugeBufferCount));
+          hugeBufferCount.incrementAndGet();
 
+          // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
+          return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount, hugeBufferSize);
         } else {
           // within chunk, use arena.
           ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
@@ -167,14 +166,14 @@ public class PooledByteBufAllocatorL {
             fail();
           }
 
-          normalBuffersHist.update(buf.capacity());
-          if (ASSERT_ENABLED) {
-            normalBufferSize.addAndGet(buf.capacity());
-            normalBufferCount.incrementAndGet();
+          if (!ASSERT_ENABLED) {
+            return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf);
           }
 
-          return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount,
-              normalBufferSize);
+          normalBufferSize.addAndGet(buf.capacity());
+          normalBufferCount.incrementAndGet();
+
+          return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount, normalBufferSize);
         }
 
       } else {
@@ -184,9 +183,10 @@ public class PooledByteBufAllocatorL {
 
     private UnsupportedOperationException fail() {
       return new UnsupportedOperationException(
-          "Arrow requries that the JVM used supports access sun.misc.Unsafe.  This platform didn't provide that functionality.");
+          "Arrow requires that the JVM used supports access sun.misc.Unsafe.  This platform didn't provide that functionality.");
     }
 
+    @Override
     public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) {
       if (initialCapacity == 0 && maxCapacity == 0) {
         newDirectBuffer(initialCapacity, maxCapacity);
@@ -215,9 +215,8 @@ public class PooledByteBufAllocatorL {
     private class MemoryStatusThread extends Thread {
 
       public MemoryStatusThread() {
-        super("memory-status-logger");
+        super("allocation.logger");
         this.setDaemon(true);
-        this.setName("allocation.logger");
       }
 
       @Override
@@ -229,12 +228,11 @@ public class PooledByteBufAllocatorL {
           } catch (InterruptedException e) {
             return;
           }
-
         }
       }
-
     }
 
+    @Override
     public String toString() {
       StringBuilder buf = new StringBuilder();
       buf.append(directArenas.length);
@@ -260,13 +258,4 @@ public class PooledByteBufAllocatorL {
 
 
   }
-
-  public static final boolean ASSERT_ENABLED;
-
-  static {
-    boolean isAssertEnabled = false;
-    assert isAssertEnabled = true;
-    ASSERT_ENABLED = isAssertEnabled;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
index 023a6a2..5ea1767 100644
--- a/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
+++ b/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
@@ -18,8 +18,6 @@
 
 package io.netty.buffer;
 
-import io.netty.util.internal.PlatformDependent;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -32,7 +30,7 @@ import io.netty.util.internal.PlatformDependent;
  * The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs to abstract away the
  * Netty classes and underlying Netty memory management.
  */
-public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
+public class UnsafeDirectLittleEndian extends WrappedByteBuf {
   private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
   private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
 
@@ -40,35 +38,25 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
   private final AbstractByteBuf wrapped;
   private final long memoryAddress;
 
-  private final AtomicLong bufferCount;
-  private final AtomicLong bufferSize;
-  private final long initCap;
-
   UnsafeDirectLittleEndian(DuplicatedByteBuf buf) {
-    this(buf, true, null, null);
+    this(buf, true);
   }
 
   UnsafeDirectLittleEndian(LargeBuffer buf) {
-    this(buf, true, null, null);
+    this(buf, true);
   }
 
-  UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong bufferCount, AtomicLong bufferSize) {
-    this(buf, true, bufferCount, bufferSize);
+  UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf) {
+    this(buf, true);
 
   }
 
-  private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake, AtomicLong bufferCount, AtomicLong bufferSize) {
+  private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake) {
     super(buf);
     if (!NATIVE_ORDER || buf.order() != ByteOrder.BIG_ENDIAN) {
       throw new IllegalStateException("Arrow only runs on LittleEndian systems.");
     }
 
-    this.bufferCount = bufferCount;
-    this.bufferSize = bufferSize;
-
-    // initCap is used if we're tracking memory release. If we're in non-debug mode, we'll skip this.
-    this.initCap = ASSERT_ENABLED ? buf.capacity() : -1;
-
     this.wrapped = buf;
     this.memoryAddress = buf.memoryAddress();
   }
@@ -245,16 +233,6 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
   }
 
   @Override
-  public boolean release(int decrement) {
-    final boolean released = super.release(decrement);
-    if (ASSERT_ENABLED && released && bufferCount != null && bufferSize != null) {
-      bufferCount.decrementAndGet();
-      bufferSize.addAndGet(-initCap);
-    }
-    return released;
-  }
-
-  @Override
   public int setBytes(int index, InputStream in, int length) throws IOException {
     wrapped.checkIndex(index, length);
     byte[] tmp = new byte[length];

http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
new file mode 100644
index 0000000..1b127f8
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.memory;
+
+/**
+ * An allocation listener being notified for allocation/deallocation
+ *
+ * It is expected to be called from multiple threads and as such,
+ * provider should take care of making the implementation thread-safe
+ */
+public interface AllocationListener {
+  public static final AllocationListener NOOP = new AllocationListener() {
+    @Override
+    public void onAllocation(long size) {
+    }
+  };
+
+  /**
+   * Called each time a new buffer is allocated
+   *
+   * @param size the buffer size being allocated
+   */
+  void onAllocation(long size);
+
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
index 43ee9c1..f15bb8a 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
@@ -18,9 +18,6 @@
 package org.apache.arrow.memory;
 
 import static org.apache.arrow.memory.BaseAllocator.indent;
-import io.netty.buffer.ArrowBuf;
-import io.netty.buffer.PooledByteBufAllocatorL;
-import io.netty.buffer.UnsafeDirectLittleEndian;
 
 import java.util.IdentityHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -31,10 +28,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.arrow.memory.BaseAllocator.Verbosity;
 import org.apache.arrow.memory.util.AutoCloseableLock;
 import org.apache.arrow.memory.util.HistoricalLog;
-import org.apache.arrow.memory.util.Metrics;
 
 import com.google.common.base.Preconditions;
 
+import io.netty.buffer.ArrowBuf;
+import io.netty.buffer.PooledByteBufAllocatorL;
+import io.netty.buffer.UnsafeDirectLittleEndian;
+
 /**
  * Manages the relationship between one or more allocators and a particular UDLE. Ensures that one allocator owns the
  * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its associated allocators.
@@ -56,7 +56,10 @@ public class AllocationManager {
 
   private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0);
   private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0);
-  static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL(Metrics.getInstance());
+  private static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL();
+
+  static final UnsafeDirectLittleEndian EMPTY = INNER_ALLOCATOR.empty;
+  static final long CHUNK_SIZE = INNER_ALLOCATOR.getChunkSize();
 
   private final RootAllocator root;
   private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
index dbb0705..9edafbc 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
@@ -21,7 +21,6 @@ import java.util.Arrays;
 import java.util.IdentityHashMap;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.arrow.memory.AllocationManager.BufferLedger;
 import org.apache.arrow.memory.util.AssertionUtil;
@@ -37,14 +36,12 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
 
   public static final String DEBUG_ALLOCATOR = "arrow.memory.debug.allocator";
 
-  private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
-  private static final int CHUNK_SIZE = AllocationManager.INNER_ALLOCATOR.getChunkSize();
-
   public static final int DEBUG_LOG_LENGTH = 6;
   public static final boolean DEBUG = AssertionUtil.isAssertionsEnabled()
       || Boolean.parseBoolean(System.getProperty(DEBUG_ALLOCATOR, "false"));
   private final Object DEBUG_LOCK = DEBUG ? new Object() : null;
 
+  private final AllocationListener listener;
   private final BaseAllocator parentAllocator;
   private final ArrowByteBufAllocator thisAsByteBufAllocator;
   private final IdentityHashMap<BaseAllocator, Object> childAllocators;
@@ -62,12 +59,31 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
   private final HistoricalLog historicalLog;
 
   protected BaseAllocator(
+      final AllocationListener listener,
+      final String name,
+      final long initReservation,
+      final long maxAllocation) throws OutOfMemoryException {
+    this(listener, null, name, initReservation, maxAllocation);
+  }
+
+  protected BaseAllocator(
+      final BaseAllocator parentAllocator,
+      final String name,
+      final long initReservation,
+      final long maxAllocation) throws OutOfMemoryException {
+    this(parentAllocator.listener, parentAllocator, name, initReservation, maxAllocation);
+  }
+
+  private BaseAllocator(
+      final AllocationListener listener,
       final BaseAllocator parentAllocator,
       final String name,
       final long initReservation,
       final long maxAllocation) throws OutOfMemoryException {
     super(parentAllocator, initReservation, maxAllocation);
 
+    this.listener = listener;
+
     if (parentAllocator != null) {
       this.root = parentAllocator.root;
       empty = parentAllocator.empty;
@@ -192,7 +208,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
   private ArrowBuf createEmpty(){
     assertOpen();
 
-    return new ArrowBuf(new AtomicInteger(), null, AllocationManager.INNER_ALLOCATOR.empty, null, null, 0, 0, true);
+    return new ArrowBuf(new AtomicInteger(), null, AllocationManager.EMPTY, null, null, 0, 0, true);
   }
 
   @Override
@@ -206,7 +222,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
     }
 
     // round to next largest power of two if we're within a chunk since that is how our allocator operates
-    final int actualRequestSize = initialRequestSize < CHUNK_SIZE ?
+    final int actualRequestSize = initialRequestSize < AllocationManager.CHUNK_SIZE ?
         nextPowerOfTwo(initialRequestSize)
         : initialRequestSize;
     AllocationOutcome outcome = this.allocateBytes(actualRequestSize);
@@ -218,6 +234,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
     try {
       ArrowBuf buffer = bufferWithoutReservation(actualRequestSize, manager);
       success = true;
+      listener.onAllocation(actualRequestSize);
       return buffer;
     } finally {
       if (!success) {
@@ -405,6 +422,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
       try {
         final ArrowBuf arrowBuf = BaseAllocator.this.bufferWithoutReservation(nBytes, null);
 
+        listener.onAllocation(nBytes);
         if (DEBUG) {
           historicalLog.recordEvent("allocate() => %s", String.format("ArrowBuf[%d]", arrowBuf.getId()));
         }

http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
index 571fc37..57a2c0c 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
@@ -24,9 +24,12 @@ import com.google.common.annotations.VisibleForTesting;
  * tree of descendant child allocators.
  */
 public class RootAllocator extends BaseAllocator {
-
   public RootAllocator(final long limit) {
-    super(null, "ROOT", 0, limit);
+    this(AllocationListener.NOOP, limit);
+  }
+
+  public RootAllocator(final AllocationListener listener, final long limit) {
+    super(listener, "ROOT", 0, limit);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/org/apache/arrow/memory/util/Metrics.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/util/Metrics.java b/java/memory/src/main/java/org/apache/arrow/memory/util/Metrics.java
deleted file mode 100644
index 5177a24..0000000
--- a/java/memory/src/main/java/org/apache/arrow/memory/util/Metrics.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.arrow.memory.util;
-
-import com.codahale.metrics.MetricRegistry;
-
-public class Metrics {
-
-  private Metrics() {
-
-  }
-
-  private static class RegistryHolder {
-    public static final MetricRegistry REGISTRY;
-
-    static {
-      REGISTRY = new MetricRegistry();
-    }
-
-  }
-
-  public static MetricRegistry getInstance() {
-    return RegistryHolder.REGISTRY;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/org/apache/arrow/memory/util/Pointer.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/util/Pointer.java b/java/memory/src/main/java/org/apache/arrow/memory/util/Pointer.java
deleted file mode 100644
index 58ab13b..0000000
--- a/java/memory/src/main/java/org/apache/arrow/memory/util/Pointer.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.arrow.memory.util;
-
-public class Pointer<T> {
-  public T value;
-
-  public Pointer(){}
-
-  public Pointer(T value){
-    this.value = value;
-  }
-}