You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/12 17:21:45 UTC
arrow git commit: ARROW-385: Refactors metric system
Repository: arrow
Updated Branches:
refs/heads/master 7d3e2a3ab -> c5663c6d0
ARROW-385: Refactors metric system
Arrow has some support for metrics, but the metrics registry is by default
not configured to export values. It also forces user to user yammer/codahale
metrics library instead of the library of their choice.
To allow for integration with other metrics system, replace it with a notification
mechanism to alert user on allocation/deallocation.
Author: Laurent Goujon <la...@dremio.com>
Closes #212 from laurentgo/laurent/metrics-refactoring and squashes the following commits:
e6c435b [Laurent Goujon] ARROW-385: Refactors metric system
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/c5663c6d
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/c5663c6d
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/c5663c6d
Branch: refs/heads/master
Commit: c5663c6d00dbd297dac573670156e26dc0593357
Parents: 7d3e2a3
Author: Laurent Goujon <la...@dremio.com>
Authored: Thu Jan 12 12:21:37 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Thu Jan 12 12:21:37 2017 -0500
----------------------------------------------------------------------
java/memory/pom.xml | 7 -
.../main/java/io/netty/buffer/LargeBuffer.java | 31 +---
.../netty/buffer/PooledByteBufAllocatorL.java | 157 +++++++++----------
.../netty/buffer/UnsafeDirectLittleEndian.java | 34 +---
.../apache/arrow/memory/AllocationListener.java | 40 +++++
.../apache/arrow/memory/AllocationManager.java | 13 +-
.../org/apache/arrow/memory/BaseAllocator.java | 30 +++-
.../org/apache/arrow/memory/RootAllocator.java | 7 +-
.../org/apache/arrow/memory/util/Metrics.java | 40 -----
.../org/apache/arrow/memory/util/Pointer.java | 28 ----
10 files changed, 158 insertions(+), 229 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/pom.xml
----------------------------------------------------------------------
diff --git a/java/memory/pom.xml b/java/memory/pom.xml
index 6ed1448..a4eb652 100644
--- a/java/memory/pom.xml
+++ b/java/memory/pom.xml
@@ -20,13 +20,6 @@
<name>Arrow Memory</name>
<dependencies>
-
- <dependency>
- <groupId>com.codahale.metrics</groupId>
- <artifactId>metrics-core</artifactId>
- <version>3.0.1</version>
- </dependency>
-
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java b/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java
index 5f5e904..c026e43 100644
--- a/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java
+++ b/java/memory/src/main/java/io/netty/buffer/LargeBuffer.java
@@ -17,43 +17,16 @@
*/
package io.netty.buffer;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and counts.
*/
public class LargeBuffer extends MutableWrappedByteBuf {
-
- private final AtomicLong hugeBufferSize;
- private final AtomicLong hugeBufferCount;
-
- private final int initCap;
-
- public LargeBuffer(ByteBuf buffer, AtomicLong hugeBufferSize, AtomicLong hugeBufferCount) {
+ public LargeBuffer(ByteBuf buffer) {
super(buffer);
- initCap = buffer.capacity();
- this.hugeBufferCount = hugeBufferCount;
- this.hugeBufferSize = hugeBufferSize;
}
@Override
public ByteBuf copy(int index, int length) {
- return new LargeBuffer(buffer.copy(index, length), hugeBufferSize, hugeBufferCount);
+ return new LargeBuffer(buffer.copy(index, length));
}
-
- @Override
- public boolean release() {
- return release(1);
- }
-
- @Override
- public boolean release(int decrement) {
- boolean released = unwrap().release(decrement);
- if (released) {
- hugeBufferSize.addAndGet(-initCap);
- hugeBufferCount.decrementAndGet();
- }
- return released;
- }
-
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
index f6feb65..a843ac5 100644
--- a/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
+++ b/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
@@ -17,7 +17,7 @@
*/
package io.netty.buffer;
-import io.netty.util.internal.StringUtil;
+import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
@@ -25,24 +25,16 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.arrow.memory.OutOfMemoryException;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
+import io.netty.util.internal.StringUtil;
/**
* The base allocator that we use for all of Arrow's memory management. Returns UnsafeDirectLittleEndian buffers.
*/
public class PooledByteBufAllocatorL {
- private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("drill.allocator");
+ private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator");
private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
-
- public static final String METRIC_PREFIX = "drill.allocator.";
-
- private final MetricRegistry registry;
private final AtomicLong hugeBufferSize = new AtomicLong(0);
private final AtomicLong hugeBufferCount = new AtomicLong(0);
private final AtomicLong normalBufferSize = new AtomicLong(0);
@@ -51,8 +43,7 @@ public class PooledByteBufAllocatorL {
private final InnerAllocator allocator;
public final UnsafeDirectLittleEndian empty;
- public PooledByteBufAllocatorL(MetricRegistry registry) {
- this.registry = registry;
+ public PooledByteBufAllocatorL() {
allocator = new InnerAllocator();
empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER));
}
@@ -70,13 +61,66 @@ public class PooledByteBufAllocatorL {
return allocator.chunkSize;
}
- private class InnerAllocator extends PooledByteBufAllocator {
+ public long getHugeBufferSize() {
+ return hugeBufferSize.get();
+ }
+ public long getHugeBufferCount() {
+ return hugeBufferCount.get();
+ }
+ public long getNormalBufferSize() {
+ return normalBufferSize.get();
+ }
+
+ public long getNormalBufferCount() {
+ return normalBufferSize.get();
+ }
+
+ private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian {
+ private final long initialCapacity;
+ private final AtomicLong count;
+ private final AtomicLong size;
+
+ private AccountedUnsafeDirectLittleEndian(LargeBuffer buf, AtomicLong count, AtomicLong size) {
+ super(buf);
+ this.initialCapacity = buf.capacity();
+ this.count = count;
+ this.size = size;
+ }
+
+ private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count, AtomicLong size) {
+ super(buf);
+ this.initialCapacity = buf.capacity();
+ this.count = count;
+ this.size = size;
+ }
+
+ @Override
+ public ByteBuf copy() {
+ throw new UnsupportedOperationException("copy method is not supported");
+ }
+
+ @Override
+ public ByteBuf copy(int index, int length) {
+ throw new UnsupportedOperationException("copy method is not supported");
+ }
+
+ @Override
+ public boolean release(int decrement) {
+ boolean released = super.release(decrement);
+ if (released) {
+ count.decrementAndGet();
+ size.addAndGet(-initialCapacity);
+ }
+ return released;
+ }
+
+ }
+
+ private class InnerAllocator extends PooledByteBufAllocator {
private final PoolArena<ByteBuffer>[] directArenas;
private final MemoryStatusThread statusThread;
- private final Histogram largeBuffersHist;
- private final Histogram normalBuffersHist;
private final int chunkSize;
public InnerAllocator() {
@@ -98,50 +142,6 @@ public class PooledByteBufAllocatorL {
} else {
statusThread = null;
}
- removeOldMetrics();
-
- registry.register(METRIC_PREFIX + "normal.size", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return normalBufferSize.get();
- }
- });
-
- registry.register(METRIC_PREFIX + "normal.count", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return normalBufferCount.get();
- }
- });
-
- registry.register(METRIC_PREFIX + "huge.size", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return hugeBufferSize.get();
- }
- });
-
- registry.register(METRIC_PREFIX + "huge.count", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return hugeBufferCount.get();
- }
- });
-
- largeBuffersHist = registry.histogram(METRIC_PREFIX + "huge.hist");
- normalBuffersHist = registry.histogram(METRIC_PREFIX + "normal.hist");
-
- }
-
-
- private synchronized void removeOldMetrics() {
- registry.removeMatching(new MetricFilter() {
- @Override
- public boolean matches(String name, Metric metric) {
- return name.startsWith("drill.allocator.");
- }
-
- });
}
private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity) {
@@ -154,12 +154,11 @@ public class PooledByteBufAllocatorL {
// This is beyond chunk size so we'll allocate separately.
ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);
- hugeBufferCount.incrementAndGet();
hugeBufferSize.addAndGet(buf.capacity());
- largeBuffersHist.update(buf.capacity());
- // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
- return new UnsafeDirectLittleEndian(new LargeBuffer(buf, hugeBufferSize, hugeBufferCount));
+ hugeBufferCount.incrementAndGet();
+ // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
+ return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount, hugeBufferSize);
} else {
// within chunk, use arena.
ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
@@ -167,14 +166,14 @@ public class PooledByteBufAllocatorL {
fail();
}
- normalBuffersHist.update(buf.capacity());
- if (ASSERT_ENABLED) {
- normalBufferSize.addAndGet(buf.capacity());
- normalBufferCount.incrementAndGet();
+ if (!ASSERT_ENABLED) {
+ return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf);
}
- return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount,
- normalBufferSize);
+ normalBufferSize.addAndGet(buf.capacity());
+ normalBufferCount.incrementAndGet();
+
+ return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount, normalBufferSize);
}
} else {
@@ -184,9 +183,10 @@ public class PooledByteBufAllocatorL {
private UnsupportedOperationException fail() {
return new UnsupportedOperationException(
- "Arrow requries that the JVM used supports access sun.misc.Unsafe. This platform didn't provide that functionality.");
+ "Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform didn't provide that functionality.");
}
+ @Override
public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
newDirectBuffer(initialCapacity, maxCapacity);
@@ -215,9 +215,8 @@ public class PooledByteBufAllocatorL {
private class MemoryStatusThread extends Thread {
public MemoryStatusThread() {
- super("memory-status-logger");
+ super("allocation.logger");
this.setDaemon(true);
- this.setName("allocation.logger");
}
@Override
@@ -229,12 +228,11 @@ public class PooledByteBufAllocatorL {
} catch (InterruptedException e) {
return;
}
-
}
}
-
}
+ @Override
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append(directArenas.length);
@@ -260,13 +258,4 @@ public class PooledByteBufAllocatorL {
}
-
- public static final boolean ASSERT_ENABLED;
-
- static {
- boolean isAssertEnabled = false;
- assert isAssertEnabled = true;
- ASSERT_ENABLED = isAssertEnabled;
- }
-
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
index 023a6a2..5ea1767 100644
--- a/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
+++ b/java/memory/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
@@ -18,8 +18,6 @@
package io.netty.buffer;
-import io.netty.util.internal.PlatformDependent;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -32,7 +30,7 @@ import io.netty.util.internal.PlatformDependent;
* The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs to abstract away the
* Netty classes and underlying Netty memory management.
*/
-public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
+public class UnsafeDirectLittleEndian extends WrappedByteBuf {
private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
@@ -40,35 +38,25 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
private final AbstractByteBuf wrapped;
private final long memoryAddress;
- private final AtomicLong bufferCount;
- private final AtomicLong bufferSize;
- private final long initCap;
-
UnsafeDirectLittleEndian(DuplicatedByteBuf buf) {
- this(buf, true, null, null);
+ this(buf, true);
}
UnsafeDirectLittleEndian(LargeBuffer buf) {
- this(buf, true, null, null);
+ this(buf, true);
}
- UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong bufferCount, AtomicLong bufferSize) {
- this(buf, true, bufferCount, bufferSize);
+ UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf) {
+ this(buf, true);
}
- private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake, AtomicLong bufferCount, AtomicLong bufferSize) {
+ private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake) {
super(buf);
if (!NATIVE_ORDER || buf.order() != ByteOrder.BIG_ENDIAN) {
throw new IllegalStateException("Arrow only runs on LittleEndian systems.");
}
- this.bufferCount = bufferCount;
- this.bufferSize = bufferSize;
-
- // initCap is used if we're tracking memory release. If we're in non-debug mode, we'll skip this.
- this.initCap = ASSERT_ENABLED ? buf.capacity() : -1;
-
this.wrapped = buf;
this.memoryAddress = buf.memoryAddress();
}
@@ -245,16 +233,6 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
}
@Override
- public boolean release(int decrement) {
- final boolean released = super.release(decrement);
- if (ASSERT_ENABLED && released && bufferCount != null && bufferSize != null) {
- bufferCount.decrementAndGet();
- bufferSize.addAndGet(-initCap);
- }
- return released;
- }
-
- @Override
public int setBytes(int index, InputStream in, int length) throws IOException {
wrapped.checkIndex(index, length);
byte[] tmp = new byte[length];
http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
new file mode 100644
index 0000000..1b127f8
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.memory;
+
+/**
+ * An allocation listener being notified for allocation/deallocation
+ *
+ * It is expected to be called from multiple threads and as such,
+ * provider should take care of making the implementation thread-safe
+ */
+public interface AllocationListener {
+ public static final AllocationListener NOOP = new AllocationListener() {
+ @Override
+ public void onAllocation(long size) {
+ }
+ };
+
+ /**
+ * Called each time a new buffer is allocated
+ *
+ * @param size the buffer size being allocated
+ */
+ void onAllocation(long size);
+
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
index 43ee9c1..f15bb8a 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
@@ -18,9 +18,6 @@
package org.apache.arrow.memory;
import static org.apache.arrow.memory.BaseAllocator.indent;
-import io.netty.buffer.ArrowBuf;
-import io.netty.buffer.PooledByteBufAllocatorL;
-import io.netty.buffer.UnsafeDirectLittleEndian;
import java.util.IdentityHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -31,10 +28,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.arrow.memory.BaseAllocator.Verbosity;
import org.apache.arrow.memory.util.AutoCloseableLock;
import org.apache.arrow.memory.util.HistoricalLog;
-import org.apache.arrow.memory.util.Metrics;
import com.google.common.base.Preconditions;
+import io.netty.buffer.ArrowBuf;
+import io.netty.buffer.PooledByteBufAllocatorL;
+import io.netty.buffer.UnsafeDirectLittleEndian;
+
/**
* Manages the relationship between one or more allocators and a particular UDLE. Ensures that one allocator owns the
* memory that multiple allocators may be referencing. Manages a BufferLedger between each of its associated allocators.
@@ -56,7 +56,10 @@ public class AllocationManager {
private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0);
private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0);
- static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL(Metrics.getInstance());
+ private static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL();
+
+ static final UnsafeDirectLittleEndian EMPTY = INNER_ALLOCATOR.empty;
+ static final long CHUNK_SIZE = INNER_ALLOCATOR.getChunkSize();
private final RootAllocator root;
private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet();
http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
index dbb0705..9edafbc 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
@@ -21,7 +21,6 @@ import java.util.Arrays;
import java.util.IdentityHashMap;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.arrow.memory.AllocationManager.BufferLedger;
import org.apache.arrow.memory.util.AssertionUtil;
@@ -37,14 +36,12 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
public static final String DEBUG_ALLOCATOR = "arrow.memory.debug.allocator";
- private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
- private static final int CHUNK_SIZE = AllocationManager.INNER_ALLOCATOR.getChunkSize();
-
public static final int DEBUG_LOG_LENGTH = 6;
public static final boolean DEBUG = AssertionUtil.isAssertionsEnabled()
|| Boolean.parseBoolean(System.getProperty(DEBUG_ALLOCATOR, "false"));
private final Object DEBUG_LOCK = DEBUG ? new Object() : null;
+ private final AllocationListener listener;
private final BaseAllocator parentAllocator;
private final ArrowByteBufAllocator thisAsByteBufAllocator;
private final IdentityHashMap<BaseAllocator, Object> childAllocators;
@@ -62,12 +59,31 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
private final HistoricalLog historicalLog;
protected BaseAllocator(
+ final AllocationListener listener,
+ final String name,
+ final long initReservation,
+ final long maxAllocation) throws OutOfMemoryException {
+ this(listener, null, name, initReservation, maxAllocation);
+ }
+
+ protected BaseAllocator(
+ final BaseAllocator parentAllocator,
+ final String name,
+ final long initReservation,
+ final long maxAllocation) throws OutOfMemoryException {
+ this(parentAllocator.listener, parentAllocator, name, initReservation, maxAllocation);
+ }
+
+ private BaseAllocator(
+ final AllocationListener listener,
final BaseAllocator parentAllocator,
final String name,
final long initReservation,
final long maxAllocation) throws OutOfMemoryException {
super(parentAllocator, initReservation, maxAllocation);
+ this.listener = listener;
+
if (parentAllocator != null) {
this.root = parentAllocator.root;
empty = parentAllocator.empty;
@@ -192,7 +208,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
private ArrowBuf createEmpty(){
assertOpen();
- return new ArrowBuf(new AtomicInteger(), null, AllocationManager.INNER_ALLOCATOR.empty, null, null, 0, 0, true);
+ return new ArrowBuf(new AtomicInteger(), null, AllocationManager.EMPTY, null, null, 0, 0, true);
}
@Override
@@ -206,7 +222,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
}
// round to next largest power of two if we're within a chunk since that is how our allocator operates
- final int actualRequestSize = initialRequestSize < CHUNK_SIZE ?
+ final int actualRequestSize = initialRequestSize < AllocationManager.CHUNK_SIZE ?
nextPowerOfTwo(initialRequestSize)
: initialRequestSize;
AllocationOutcome outcome = this.allocateBytes(actualRequestSize);
@@ -218,6 +234,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
try {
ArrowBuf buffer = bufferWithoutReservation(actualRequestSize, manager);
success = true;
+ listener.onAllocation(actualRequestSize);
return buffer;
} finally {
if (!success) {
@@ -405,6 +422,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
try {
final ArrowBuf arrowBuf = BaseAllocator.this.bufferWithoutReservation(nBytes, null);
+ listener.onAllocation(nBytes);
if (DEBUG) {
historicalLog.recordEvent("allocate() => %s", String.format("ArrowBuf[%d]", arrowBuf.getId()));
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
index 571fc37..57a2c0c 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
@@ -24,9 +24,12 @@ import com.google.common.annotations.VisibleForTesting;
* tree of descendant child allocators.
*/
public class RootAllocator extends BaseAllocator {
-
public RootAllocator(final long limit) {
- super(null, "ROOT", 0, limit);
+ this(AllocationListener.NOOP, limit);
+ }
+
+ public RootAllocator(final AllocationListener listener, final long limit) {
+ super(listener, "ROOT", 0, limit);
}
/**
http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/org/apache/arrow/memory/util/Metrics.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/util/Metrics.java b/java/memory/src/main/java/org/apache/arrow/memory/util/Metrics.java
deleted file mode 100644
index 5177a24..0000000
--- a/java/memory/src/main/java/org/apache/arrow/memory/util/Metrics.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.arrow.memory.util;
-
-import com.codahale.metrics.MetricRegistry;
-
-public class Metrics {
-
- private Metrics() {
-
- }
-
- private static class RegistryHolder {
- public static final MetricRegistry REGISTRY;
-
- static {
- REGISTRY = new MetricRegistry();
- }
-
- }
-
- public static MetricRegistry getInstance() {
- return RegistryHolder.REGISTRY;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/arrow/blob/c5663c6d/java/memory/src/main/java/org/apache/arrow/memory/util/Pointer.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/util/Pointer.java b/java/memory/src/main/java/org/apache/arrow/memory/util/Pointer.java
deleted file mode 100644
index 58ab13b..0000000
--- a/java/memory/src/main/java/org/apache/arrow/memory/util/Pointer.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.arrow.memory.util;
-
-public class Pointer<T> {
- public T value;
-
- public Pointer(){}
-
- public Pointer(T value){
- this.value = value;
- }
-}