You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/12/22 16:06:33 UTC
[10/13] drill git commit: DRILL-4134: Allocator Improvements
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/io/netty/buffer/FakeAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/FakeAllocator.java b/exec/memory/base/src/main/java/io/netty/buffer/FakeAllocator.java
deleted file mode 100644
index b8d0fb2..0000000
--- a/exec/memory/base/src/main/java/io/netty/buffer/FakeAllocator.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.netty.buffer;
-
-import org.apache.drill.exec.memory.Accountor;
-import org.apache.drill.exec.memory.AllocationReservation;
-import org.apache.drill.exec.memory.AllocatorOwner;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.util.Pointer;
-
-class FakeAllocator implements BufferAllocator {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FakeAllocator.class);
-
-
- public static final Accountor FAKE_ACCOUNTOR = new FakeAccountor();
- public static final BufferAllocator FAKE_ALLOCATOR = new FakeAllocator();
-
- @Override
- public DrillBuf buffer(int size) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DrillBuf buffer(int minSize, int maxSize) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ByteBufAllocator getUnderlyingAllocator() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation, long maximumReservation,
- boolean applyFragmentLimit)
- throws OutOfMemoryException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DrillBuf getEmpty() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean takeOwnership(DrillBuf buf) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void setFragmentLimit(long l) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getFragmentLimit(){
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public long getAllocatedMemory() {
- return 0;
- }
-
- @Override
- public long getPeakMemoryAllocation() {
- return 0;
- }
-
- static class FakeAccountor extends Accountor {
-
- public FakeAccountor() {
- super(null, false, null, null, 0, 0, true);
- }
-
- @Override
- public boolean transferTo(Accountor target, DrillBuf buf, long size) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getAvailable() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getCapacity() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getAllocation() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean reserve(long size) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean forceAdditionalReservation(long size) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void reserved(long expected, DrillBuf buf) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void releasePartial(DrillBuf buf, long size) {
-
- }
-
- @Override
- public void release(DrillBuf buf, long size) {
-
- }
-
- @Override
- public void close() {
-
- }
- }
-
- @Override
- public BufferAllocator newChildAllocator(AllocatorOwner allocatorOwner,
- long initReservation, long maxAllocation, int flags) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean shareOwnership(DrillBuf buf, Pointer<DrillBuf> bufOut) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getId() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public AllocationReservation newReservation() {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/io/netty/buffer/LargeBuffer.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/LargeBuffer.java b/exec/memory/base/src/main/java/io/netty/buffer/LargeBuffer.java
index f1d4842..5f5e904 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/LargeBuffer.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/LargeBuffer.java
@@ -17,151 +17,23 @@
*/
package io.netty.buffer;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.GatheringByteChannel;
-import java.nio.channels.ScatteringByteChannel;
import java.util.concurrent.atomic.AtomicLong;
/**
- * This is basically a complete copy of DuplicatedByteBuf. We copy because we can't override the release methods to keep
- * global track of created Large Buffers.
+ * A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and counts.
*/
-public class LargeBuffer extends AbstractByteBuf {
+public class LargeBuffer extends MutableWrappedByteBuf {
private final AtomicLong hugeBufferSize;
private final AtomicLong hugeBufferCount;
- @Override
- public ByteBuffer nioBuffer(int index, int length) {
- return unwrap().nioBuffer(index, length);
- }
-
- private final ByteBuf buffer;
private final int initCap;
public LargeBuffer(ByteBuf buffer, AtomicLong hugeBufferSize, AtomicLong hugeBufferCount) {
- super(buffer.maxCapacity());
+ super(buffer);
initCap = buffer.capacity();
this.hugeBufferCount = hugeBufferCount;
this.hugeBufferSize = hugeBufferSize;
-
- if (buffer instanceof LargeBuffer) {
- this.buffer = ((LargeBuffer) buffer).buffer;
- } else {
- this.buffer = buffer;
- }
-
- setIndex(buffer.readerIndex(), buffer.writerIndex());
- }
-
- @Override
- public ByteBuf unwrap() {
- return buffer;
- }
-
- @Override
- public ByteBufAllocator alloc() {
- return buffer.alloc();
- }
-
- @Override
- public ByteOrder order() {
- return buffer.order();
- }
-
- @Override
- public boolean isDirect() {
- return buffer.isDirect();
- }
-
- @Override
- public int capacity() {
- return buffer.capacity();
- }
-
- @Override
- public ByteBuf capacity(int newCapacity) {
- buffer.capacity(newCapacity);
- return this;
- }
-
- @Override
- public boolean hasArray() {
- return buffer.hasArray();
- }
-
- @Override
- public byte[] array() {
- return buffer.array();
- }
-
- @Override
- public int arrayOffset() {
- return buffer.arrayOffset();
- }
-
- @Override
- public boolean hasMemoryAddress() {
- return buffer.hasMemoryAddress();
- }
-
- @Override
- public long memoryAddress() {
- return buffer.memoryAddress();
- }
-
- @Override
- public byte getByte(int index) {
- return _getByte(index);
- }
-
- @Override
- protected byte _getByte(int index) {
- return buffer.getByte(index);
- }
-
- @Override
- public short getShort(int index) {
- return _getShort(index);
- }
-
- @Override
- protected short _getShort(int index) {
- return buffer.getShort(index);
- }
-
- @Override
- public int getUnsignedMedium(int index) {
- return _getUnsignedMedium(index);
- }
-
- @Override
- protected int _getUnsignedMedium(int index) {
- return buffer.getUnsignedMedium(index);
- }
-
- @Override
- public int getInt(int index) {
- return _getInt(index);
- }
-
- @Override
- protected int _getInt(int index) {
- return buffer.getInt(index);
- }
-
- @Override
- public long getLong(int index) {
- return _getLong(index);
- }
-
- @Override
- protected long _getLong(int index) {
- return buffer.getLong(index);
}
@Override
@@ -170,169 +42,6 @@ public class LargeBuffer extends AbstractByteBuf {
}
@Override
- public ByteBuf slice(int index, int length) {
- return new SlicedByteBuf(this, index, length);
- }
-
- @Override
- public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
- buffer.getBytes(index, dst, dstIndex, length);
- return this;
- }
-
- @Override
- public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
- buffer.getBytes(index, dst, dstIndex, length);
- return this;
- }
-
- @Override
- public ByteBuf getBytes(int index, ByteBuffer dst) {
- buffer.getBytes(index, dst);
- return this;
- }
-
- @Override
- public ByteBuf setByte(int index, int value) {
- _setByte(index, value);
- return this;
- }
-
- @Override
- protected void _setByte(int index, int value) {
- buffer.setByte(index, value);
- }
-
- @Override
- public ByteBuf setShort(int index, int value) {
- _setShort(index, value);
- return this;
- }
-
- @Override
- protected void _setShort(int index, int value) {
- buffer.setShort(index, value);
- }
-
- @Override
- public ByteBuf setMedium(int index, int value) {
- _setMedium(index, value);
- return this;
- }
-
- @Override
- protected void _setMedium(int index, int value) {
- buffer.setMedium(index, value);
- }
-
- @Override
- public ByteBuf setInt(int index, int value) {
- _setInt(index, value);
- return this;
- }
-
- @Override
- protected void _setInt(int index, int value) {
- buffer.setInt(index, value);
- }
-
- @Override
- public ByteBuf setLong(int index, long value) {
- _setLong(index, value);
- return this;
- }
-
- @Override
- protected void _setLong(int index, long value) {
- buffer.setLong(index, value);
- }
-
- @Override
- public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
- buffer.setBytes(index, src, srcIndex, length);
- return this;
- }
-
- @Override
- public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
- buffer.setBytes(index, src, srcIndex, length);
- return this;
- }
-
- @Override
- public ByteBuf setBytes(int index, ByteBuffer src) {
- buffer.setBytes(index, src);
- return this;
- }
-
- @Override
- public ByteBuf getBytes(int index, OutputStream out, int length)
- throws IOException {
- buffer.getBytes(index, out, length);
- return this;
- }
-
- @Override
- public int getBytes(int index, GatheringByteChannel out, int length)
- throws IOException {
- return buffer.getBytes(index, out, length);
- }
-
- @Override
- public int setBytes(int index, InputStream in, int length)
- throws IOException {
- return buffer.setBytes(index, in, length);
- }
-
- @Override
- public int setBytes(int index, ScatteringByteChannel in, int length)
- throws IOException {
- return buffer.setBytes(index, in, length);
- }
-
- @Override
- public int nioBufferCount() {
- return buffer.nioBufferCount();
- }
-
- @Override
- public ByteBuffer[] nioBuffers(int index, int length) {
- return buffer.nioBuffers(index, length);
- }
-
- @Override
- public ByteBuffer internalNioBuffer(int index, int length) {
- return nioBuffer(index, length);
- }
-
- @Override
- public int forEachByte(int index, int length, ByteBufProcessor processor) {
- return buffer.forEachByte(index, length, processor);
- }
-
- @Override
- public int forEachByteDesc(int index, int length, ByteBufProcessor processor) {
- return buffer.forEachByteDesc(index, length, processor);
- }
-
- @Override
- public final int refCnt() {
- return unwrap().refCnt();
- }
-
- @Override
- public final ByteBuf retain() {
- unwrap().retain();
- return this;
- }
-
- @Override
- public final ByteBuf retain(int increment) {
- unwrap().retain(increment);
- return this;
- }
-
- @Override
public boolean release() {
return release(1);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
new file mode 100644
index 0000000..5709473
--- /dev/null
+++ b/exec/memory/base/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.netty.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+
+/**
+ * This is basically a complete copy of DuplicatedByteBuf. We copy because we want to override some behaviors and make
+ * buffer mutable.
+ */
+abstract class MutableWrappedByteBuf extends AbstractByteBuf {
+
+ @Override
+ public ByteBuffer nioBuffer(int index, int length) {
+ return unwrap().nioBuffer(index, length);
+ }
+
+ ByteBuf buffer;
+
+ public MutableWrappedByteBuf(ByteBuf buffer) {
+ super(buffer.maxCapacity());
+
+ if (buffer instanceof MutableWrappedByteBuf) {
+ this.buffer = ((MutableWrappedByteBuf) buffer).buffer;
+ } else {
+ this.buffer = buffer;
+ }
+
+ setIndex(buffer.readerIndex(), buffer.writerIndex());
+ }
+
+ @Override
+ public ByteBuf unwrap() {
+ return buffer;
+ }
+
+ @Override
+ public ByteBufAllocator alloc() {
+ return buffer.alloc();
+ }
+
+ @Override
+ public ByteOrder order() {
+ return buffer.order();
+ }
+
+ @Override
+ public boolean isDirect() {
+ return buffer.isDirect();
+ }
+
+ @Override
+ public int capacity() {
+ return buffer.capacity();
+ }
+
+ @Override
+ public ByteBuf capacity(int newCapacity) {
+ buffer.capacity(newCapacity);
+ return this;
+ }
+
+ @Override
+ public boolean hasArray() {
+ return buffer.hasArray();
+ }
+
+ @Override
+ public byte[] array() {
+ return buffer.array();
+ }
+
+ @Override
+ public int arrayOffset() {
+ return buffer.arrayOffset();
+ }
+
+ @Override
+ public boolean hasMemoryAddress() {
+ return buffer.hasMemoryAddress();
+ }
+
+ @Override
+ public long memoryAddress() {
+ return buffer.memoryAddress();
+ }
+
+ @Override
+ public byte getByte(int index) {
+ return _getByte(index);
+ }
+
+ @Override
+ protected byte _getByte(int index) {
+ return buffer.getByte(index);
+ }
+
+ @Override
+ public short getShort(int index) {
+ return _getShort(index);
+ }
+
+ @Override
+ protected short _getShort(int index) {
+ return buffer.getShort(index);
+ }
+
+ @Override
+ public int getUnsignedMedium(int index) {
+ return _getUnsignedMedium(index);
+ }
+
+ @Override
+ protected int _getUnsignedMedium(int index) {
+ return buffer.getUnsignedMedium(index);
+ }
+
+ @Override
+ public int getInt(int index) {
+ return _getInt(index);
+ }
+
+ @Override
+ protected int _getInt(int index) {
+ return buffer.getInt(index);
+ }
+
+ @Override
+ public long getLong(int index) {
+ return _getLong(index);
+ }
+
+ @Override
+ protected long _getLong(int index) {
+ return buffer.getLong(index);
+ }
+
+ @Override
+ public abstract ByteBuf copy(int index, int length);
+
+ @Override
+ public ByteBuf slice(int index, int length) {
+ return new SlicedByteBuf(this, index, length);
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+ buffer.getBytes(index, dst, dstIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+ buffer.getBytes(index, dst, dstIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuffer dst) {
+ buffer.getBytes(index, dst);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setByte(int index, int value) {
+ _setByte(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setByte(int index, int value) {
+ buffer.setByte(index, value);
+ }
+
+ @Override
+ public ByteBuf setShort(int index, int value) {
+ _setShort(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setShort(int index, int value) {
+ buffer.setShort(index, value);
+ }
+
+ @Override
+ public ByteBuf setMedium(int index, int value) {
+ _setMedium(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setMedium(int index, int value) {
+ buffer.setMedium(index, value);
+ }
+
+ @Override
+ public ByteBuf setInt(int index, int value) {
+ _setInt(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setInt(int index, int value) {
+ buffer.setInt(index, value);
+ }
+
+ @Override
+ public ByteBuf setLong(int index, long value) {
+ _setLong(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setLong(int index, long value) {
+ buffer.setLong(index, value);
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+ buffer.setBytes(index, src, srcIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+ buffer.setBytes(index, src, srcIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuffer src) {
+ buffer.setBytes(index, src);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, OutputStream out, int length)
+ throws IOException {
+ buffer.getBytes(index, out, length);
+ return this;
+ }
+
+ @Override
+ public int getBytes(int index, GatheringByteChannel out, int length)
+ throws IOException {
+ return buffer.getBytes(index, out, length);
+ }
+
+ @Override
+ public int setBytes(int index, InputStream in, int length)
+ throws IOException {
+ return buffer.setBytes(index, in, length);
+ }
+
+ @Override
+ public int setBytes(int index, ScatteringByteChannel in, int length)
+ throws IOException {
+ return buffer.setBytes(index, in, length);
+ }
+
+ @Override
+ public int nioBufferCount() {
+ return buffer.nioBufferCount();
+ }
+
+ @Override
+ public ByteBuffer[] nioBuffers(int index, int length) {
+ return buffer.nioBuffers(index, length);
+ }
+
+ @Override
+ public ByteBuffer internalNioBuffer(int index, int length) {
+ return nioBuffer(index, length);
+ }
+
+ @Override
+ public int forEachByte(int index, int length, ByteBufProcessor processor) {
+ return buffer.forEachByte(index, length, processor);
+ }
+
+ @Override
+ public int forEachByteDesc(int index, int length, ByteBufProcessor processor) {
+ return buffer.forEachByteDesc(index, length, processor);
+ }
+
+ @Override
+ public final int refCnt() {
+ return unwrap().refCnt();
+ }
+
+ @Override
+ public final ByteBuf retain() {
+ unwrap().retain();
+ return this;
+ }
+
+ @Override
+ public final ByteBuf retain(int increment) {
+ unwrap().retain(increment);
+ return this;
+ }
+
+ @Override
+ public boolean release() {
+ return release(1);
+ }
+
+ @Override
+ public boolean release(int decrement) {
+ boolean released = unwrap().release(decrement);
+ return released;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
index 2fc1bd0..47dbf59 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
@@ -23,193 +23,227 @@ import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
-public class PooledByteBufAllocatorL extends PooledByteBufAllocator{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PooledByteBufAllocatorL.class);
-
+/**
+ * The base allocator that we use for all of Drill's memory management. Returns UnsafeDirectLittleEndian buffers.
+ */
+public class PooledByteBufAllocatorL {
private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("drill.allocator");
+
private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
+
private static final String METRIC_PREFIX = "drill.allocator.";
+
private final MetricRegistry registry;
private final AtomicLong hugeBufferSize = new AtomicLong(0);
private final AtomicLong hugeBufferCount = new AtomicLong(0);
private final AtomicLong normalBufferSize = new AtomicLong(0);
private final AtomicLong normalBufferCount = new AtomicLong(0);
- private final PoolArena<ByteBuffer>[] directArenas;
- private final MemoryStatusThread statusThread;
- private final Histogram largeBuffersHist;
- private final Histogram normalBuffersHist;
+ public final InnerAllocator allocator;
+ public final UnsafeDirectLittleEndian empty;
public PooledByteBufAllocatorL(MetricRegistry registry) {
- super(true);
this.registry = registry;
+ allocator = new InnerAllocator();
+ empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER));
+ }
+
+ public UnsafeDirectLittleEndian allocate(int size) {
try {
- Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas");
- f.setAccessible(true);
- this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
- } catch (Exception e) {
- throw new RuntimeException("Failure while initializing allocator. Unable to retrieve direct arenas field.", e);
+ return allocator.directBuffer(size, size);
+ } catch (OutOfMemoryError e) {
+ throw new OutOfMemoryException("Failure allocating buffer.", e);
}
- if (memoryLogger.isTraceEnabled()) {
- statusThread = new MemoryStatusThread();
- statusThread.start();
- } else {
- statusThread = null;
- }
- removeOldMetrics();
+ }
- registry.register(METRIC_PREFIX + "normal.size", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return normalBufferSize.get();
- }
- });
+ public int getChunkSize() {
+ return allocator.chunkSize;
+ }
- registry.register(METRIC_PREFIX + "normal.count", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return normalBufferCount.get();
- }
- });
+ private class InnerAllocator extends PooledByteBufAllocator {
- registry.register(METRIC_PREFIX + "huge.size", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return hugeBufferSize.get();
- }
- });
- registry.register(METRIC_PREFIX + "huge.count", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return hugeBufferCount.get();
- }
- });
+ private final PoolArena<ByteBuffer>[] directArenas;
+ private final MemoryStatusThread statusThread;
+ private final Histogram largeBuffersHist;
+ private final Histogram normalBuffersHist;
+ private final int chunkSize;
- largeBuffersHist = registry.histogram(METRIC_PREFIX + "huge.hist");
- normalBuffersHist = registry.histogram(METRIC_PREFIX + "normal.hist");
+ public InnerAllocator() {
+ super(true);
- }
+ try {
+ Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas");
+ f.setAccessible(true);
+ this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
+ } catch (Exception e) {
+ throw new RuntimeException("Failure while initializing allocator. Unable to retrieve direct arenas field.", e);
+ }
- private synchronized void removeOldMetrics() {
- registry.removeMatching(new MetricFilter() {
- @Override
- public boolean matches(String name, Metric metric) {
- return name.startsWith("drill.allocator.");
+ this.chunkSize = directArenas[0].chunkSize;
+
+ if (memoryLogger.isTraceEnabled()) {
+ statusThread = new MemoryStatusThread();
+ statusThread.start();
+ } else {
+ statusThread = null;
}
+ removeOldMetrics();
- });
- }
+ registry.register(METRIC_PREFIX + "normal.size", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return normalBufferSize.get();
+ }
+ });
- @Override
- protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
- throw new UnsupportedOperationException("Drill doesn't support using heap buffers.");
- }
+ registry.register(METRIC_PREFIX + "normal.count", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return normalBufferCount.get();
+ }
+ });
- @Override
- protected UnsafeDirectLittleEndian newDirectBuffer(int initialCapacity, int maxCapacity) {
- PoolThreadCache cache = threadCache.get();
- PoolArena<ByteBuffer> directArena = cache.directArena;
+ registry.register(METRIC_PREFIX + "huge.size", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return hugeBufferSize.get();
+ }
+ });
- if (directArena != null) {
+ registry.register(METRIC_PREFIX + "huge.count", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return hugeBufferCount.get();
+ }
+ });
- if (initialCapacity > directArena.chunkSize) {
- // This is beyond chunk size so we'll allocate separately.
- ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);
+ largeBuffersHist = registry.histogram(METRIC_PREFIX + "huge.hist");
+ normalBuffersHist = registry.histogram(METRIC_PREFIX + "normal.hist");
- hugeBufferCount.incrementAndGet();
- hugeBufferSize.addAndGet(buf.capacity());
- largeBuffersHist.update(buf.capacity());
- // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
- return new UnsafeDirectLittleEndian(new LargeBuffer(buf, hugeBufferSize, hugeBufferCount));
+ }
- } else {
- // within chunk, use arena.
- ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
- if (!(buf instanceof PooledUnsafeDirectByteBuf)) {
- fail();
+
+ private synchronized void removeOldMetrics() {
+ registry.removeMatching(new MetricFilter() {
+ @Override
+ public boolean matches(String name, Metric metric) {
+ return name.startsWith("drill.allocator.");
}
- normalBuffersHist.update(buf.capacity());
- if (ASSERT_ENABLED) {
- normalBufferSize.addAndGet(buf.capacity());
- normalBufferCount.incrementAndGet();
+ });
+ }
+
+ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity) {
+ PoolThreadCache cache = threadCache.get();
+ PoolArena<ByteBuffer> directArena = cache.directArena;
+
+ if (directArena != null) {
+
+ if (initialCapacity > directArena.chunkSize) {
+ // This is beyond chunk size so we'll allocate separately.
+ ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);
+
+ hugeBufferCount.incrementAndGet();
+ hugeBufferSize.addAndGet(buf.capacity());
+ largeBuffersHist.update(buf.capacity());
+ // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
+ return new UnsafeDirectLittleEndian(new LargeBuffer(buf, hugeBufferSize, hugeBufferCount));
+
+ } else {
+ // within chunk, use arena.
+ ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
+ if (!(buf instanceof PooledUnsafeDirectByteBuf)) {
+ fail();
+ }
+
+ normalBuffersHist.update(buf.capacity());
+ if (ASSERT_ENABLED) {
+ normalBufferSize.addAndGet(buf.capacity());
+ normalBufferCount.incrementAndGet();
+ }
+
+ return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount,
+ normalBufferSize);
}
- return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount, normalBufferSize);
+ } else {
+ throw fail();
}
-
- } else {
- throw fail();
}
- }
-
- private UnsupportedOperationException fail() {
- return new UnsupportedOperationException(
- "Drill requries that the JVM used supports access sun.misc.Unsafe. This platform didn't provide that functionality.");
- }
+ private UnsupportedOperationException fail() {
+ return new UnsupportedOperationException(
+ "Drill requries that the JVM used supports access sun.misc.Unsafe. This platform didn't provide that functionality.");
+ }
- @Override
- public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) {
+ public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
- newDirectBuffer(initialCapacity, maxCapacity);
+ newDirectBuffer(initialCapacity, maxCapacity);
}
validate(initialCapacity, maxCapacity);
- return newDirectBuffer(initialCapacity, maxCapacity);
- }
+ return newDirectBufferL(initialCapacity, maxCapacity);
+ }
- @Override
- public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
- throw new UnsupportedOperationException("Drill doesn't support using heap buffers.");
- }
+ @Override
+ public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
+ throw new UnsupportedOperationException("Drill doesn't support using heap buffers.");
+ }
- private static void validate(int initialCapacity, int maxCapacity) {
- if (initialCapacity < 0) {
+ private void validate(int initialCapacity, int maxCapacity) {
+ if (initialCapacity < 0) {
throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expectd: 0+)");
- }
- if (initialCapacity > maxCapacity) {
+ }
+ if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
- "initialCapacity: %d (expected: not greater than maxCapacity(%d)",
- initialCapacity, maxCapacity));
+ "initialCapacity: %d (expected: not greater than maxCapacity(%d)",
+ initialCapacity, maxCapacity));
+ }
}
- }
- private class MemoryStatusThread extends Thread {
+ private class MemoryStatusThread extends Thread {
- public MemoryStatusThread() {
- super("memory-status-logger");
- this.setDaemon(true);
- this.setName("allocation.logger");
- }
+ public MemoryStatusThread() {
+ super("memory-status-logger");
+ this.setDaemon(true);
+ this.setName("allocation.logger");
+ }
- @Override
- public void run() {
- while (true) {
- memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString());
- try {
- Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
- } catch (InterruptedException e) {
- return;
- }
+ @Override
+ public void run() {
+ while (true) {
+ memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString());
+ try {
+ Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
}
- }
- }
+ }
- public void checkAndReset() {
- if (hugeBufferCount.get() != 0 || normalBufferCount.get() != 0) {
+ public String toString() {
StringBuilder buf = new StringBuilder();
+ buf.append(directArenas.length);
+ buf.append(" direct arena(s):");
+ buf.append(StringUtil.NEWLINE);
+ for (PoolArena<ByteBuffer> a : directArenas) {
+ buf.append(a);
+ }
+
buf.append("Large buffers outstanding: ");
buf.append(hugeBufferCount.get());
buf.append(" totaling ");
@@ -221,35 +255,10 @@ public class PooledByteBufAllocatorL extends PooledByteBufAllocator{
buf.append(" totaling ");
buf.append(normalBufferSize.get());
buf.append(" bytes.");
- hugeBufferCount.set(0);
- normalBufferCount.set(0);
- hugeBufferSize.set(0);
- normalBufferSize.set(0);
- throw new IllegalStateException(buf.toString());
+ return buf.toString();
}
- }
- public String toString() {
- StringBuilder buf = new StringBuilder();
- buf.append(directArenas.length);
- buf.append(" direct arena(s):");
- buf.append(StringUtil.NEWLINE);
- for (PoolArena<ByteBuffer> a : directArenas) {
- buf.append(a);
- }
- buf.append("Large buffers outstanding: ");
- buf.append(this.hugeBufferCount.get());
- buf.append(" totaling ");
- buf.append(this.hugeBufferSize.get());
- buf.append(" bytes.");
- buf.append('\n');
- buf.append("Normal buffers outstanding: ");
- buf.append(this.normalBufferCount.get());
- buf.append(" totaling ");
- buf.append(this.normalBufferSize.get());
- buf.append(" bytes.");
- return buf.toString();
}
public static final boolean ASSERT_ENABLED;
@@ -259,4 +268,5 @@ public class PooledByteBufAllocatorL extends PooledByteBufAllocator{
assert isAssertEnabled = true;
ASSERT_ENABLED = isAssertEnabled;
}
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
index 419aef3..12e9907 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
@@ -23,92 +23,46 @@ import io.netty.util.internal.PlatformDependent;
import java.nio.ByteOrder;
import java.util.concurrent.atomic.AtomicLong;
+/**
+ * The underlying class we use for little-endian access to memory. Is used underneath DrillBufs to abstract away the
+ * Netty classes and underlying Netty memory management.
+ */
public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
private final AbstractByteBuf wrapped;
private final long memoryAddress;
- private static final boolean TRACK_BUFFERS = false;
- private AtomicLong bufferCount;
- private AtomicLong bufferSize;
- private long initCap = -1;
- private final static IdentityHashMap<UnsafeDirectLittleEndian, StackTrace> bufferMap = new IdentityHashMap<>();
+ private final AtomicLong bufferCount;
+ private final AtomicLong bufferSize;
+ private final long initCap;
- @Override
- public boolean release() {
- return release(1);
+ UnsafeDirectLittleEndian(DuplicatedByteBuf buf) {
+ this(buf, true, null, null);
}
- @Override
- public boolean release(int decrement) {
- boolean released = super.release(decrement);
- if (TRACK_BUFFERS) {
- if (released) {
- final Object object;
- synchronized (bufferMap) {
- object = bufferMap.remove(this);
- }
- if (object == null) {
- throw new IllegalStateException("no such buffer");
- }
-
- if (initCap != -1) {
- bufferCount.decrementAndGet();
- bufferSize.addAndGet(-initCap);
- }
- }
- }
-
- return released;
+ UnsafeDirectLittleEndian(LargeBuffer buf) {
+ this(buf, true, null, null);
}
+ UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong bufferCount, AtomicLong bufferSize) {
+ this(buf, true, bufferCount, bufferSize);
- public static int getBufferCount() {
- return bufferMap.size();
- }
-
- public static void releaseBuffers() {
- synchronized(bufferMap) {
- final Set<UnsafeDirectLittleEndian> bufferSet = bufferMap.keySet();
- final LinkedList<UnsafeDirectLittleEndian> bufferList = new LinkedList<>(bufferSet);
- while(!bufferList.isEmpty()) {
- final UnsafeDirectLittleEndian udle = bufferList.removeFirst();
- udle.release(udle.refCnt());
- }
- }
}
- public static void logBuffers(final Logger logger) {
- synchronized (bufferMap) {
- int count = 0;
- final Set<UnsafeDirectLittleEndian> bufferSet = bufferMap.keySet();
- for (final UnsafeDirectLittleEndian udle : bufferSet) {
- final StackTrace stackTrace = bufferMap.get(udle);
- ++count;
- logger.debug("#" + count + " active buffer allocated at\n" + stackTrace);
- }
+ private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake, AtomicLong bufferCount, AtomicLong bufferSize) {
+ super(buf);
+ if (!NATIVE_ORDER || buf.order() != ByteOrder.BIG_ENDIAN) {
+ throw new IllegalStateException("Drill only runs on LittleEndian systems.");
}
- }
- UnsafeDirectLittleEndian(LargeBuffer buf) {
- this(buf, true);
- }
- UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong bufferCount, AtomicLong bufferSize) {
- this(buf, true);
this.bufferCount = bufferCount;
this.bufferSize = bufferSize;
// initCap is used if we're tracking memory release. If we're in non-debug mode, we'll skip this.
- this.initCap = ASSERT_ENABLED ? capacity() : -1;
- }
+ this.initCap = ASSERT_ENABLED ? buf.capacity() : -1;
- private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake) {
- super(buf);
- if (!NATIVE_ORDER || buf.order() != ByteOrder.BIG_ENDIAN) {
- throw new IllegalStateException("Drill only runs on LittleEndian systems.");
- }
- wrapped = buf;
- memoryAddress = buf.memoryAddress();
+ this.wrapped = buf;
+ this.memoryAddress = buf.memoryAddress();
}
private long addr(int index) {
return memoryAddress + index;
@@ -147,142 +101,140 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
}
@Override
- public double getDouble(int index) {
- return Double.longBitsToDouble(getLong(index));
- }
+ public double getDouble(int index) {
+ return Double.longBitsToDouble(getLong(index));
+ }
- @Override
- public char getChar(int index) {
- return (char) getShort(index);
- }
+ @Override
+ public char getChar(int index) {
+ return (char) getShort(index);
+ }
- @Override
- public long getUnsignedInt(int index) {
- return getInt(index) & 0xFFFFFFFFL;
- }
+ @Override
+ public long getUnsignedInt(int index) {
+ return getInt(index) & 0xFFFFFFFFL;
+ }
- @Override
- public int getInt(int index) {
-// wrapped.checkIndex(index, 4);
- int v = PlatformDependent.getInt(addr(index));
- return v;
- }
+ @Override
+ public int getInt(int index) {
+ int v = PlatformDependent.getInt(addr(index));
+ return v;
+ }
- @Override
- public int getUnsignedShort(int index) {
- return getShort(index) & 0xFFFF;
- }
+ @Override
+ public int getUnsignedShort(int index) {
+ return getShort(index) & 0xFFFF;
+ }
- @Override
- public short getShort(int index) {
-// wrapped.checkIndex(index, 2);
- short v = PlatformDependent.getShort(addr(index));
- return v;
- }
+ @Override
+ public short getShort(int index) {
+ short v = PlatformDependent.getShort(addr(index));
+ return v;
+ }
- @Override
- public ByteBuf setShort(int index, int value) {
- wrapped.checkIndex(index, 2);
- _setShort(index, value);
- return this;
- }
+ @Override
+ public ByteBuf setShort(int index, int value) {
+ wrapped.checkIndex(index, 2);
+ _setShort(index, value);
+ return this;
+ }
- @Override
- public ByteBuf setInt(int index, int value) {
- wrapped.checkIndex(index, 4);
- _setInt(index, value);
- return this;
- }
+ @Override
+ public ByteBuf setInt(int index, int value) {
+ wrapped.checkIndex(index, 4);
+ _setInt(index, value);
+ return this;
+ }
- @Override
- public ByteBuf setLong(int index, long value) {
- wrapped.checkIndex(index, 8);
- _setLong(index, value);
- return this;
- }
+ @Override
+ public ByteBuf setLong(int index, long value) {
+ wrapped.checkIndex(index, 8);
+ _setLong(index, value);
+ return this;
+ }
- @Override
- public ByteBuf setChar(int index, int value) {
- setShort(index, value);
- return this;
- }
+ @Override
+ public ByteBuf setChar(int index, int value) {
+ setShort(index, value);
+ return this;
+ }
- @Override
- public ByteBuf setFloat(int index, float value) {
- setInt(index, Float.floatToRawIntBits(value));
- return this;
- }
+ @Override
+ public ByteBuf setFloat(int index, float value) {
+ setInt(index, Float.floatToRawIntBits(value));
+ return this;
+ }
- @Override
- public ByteBuf setDouble(int index, double value) {
- setLong(index, Double.doubleToRawLongBits(value));
- return this;
- }
+ @Override
+ public ByteBuf setDouble(int index, double value) {
+ setLong(index, Double.doubleToRawLongBits(value));
+ return this;
+ }
- @Override
- public ByteBuf writeShort(int value) {
- wrapped.ensureWritable(2);
- _setShort(wrapped.writerIndex, value);
- wrapped.writerIndex += 2;
- return this;
- }
+ @Override
+ public ByteBuf writeShort(int value) {
+ wrapped.ensureWritable(2);
+ _setShort(wrapped.writerIndex, value);
+ wrapped.writerIndex += 2;
+ return this;
+ }
- @Override
- public ByteBuf writeInt(int value) {
- wrapped.ensureWritable(4);
- _setInt(wrapped.writerIndex, value);
- wrapped.writerIndex += 4;
- return this;
- }
+ @Override
+ public ByteBuf writeInt(int value) {
+ wrapped.ensureWritable(4);
+ _setInt(wrapped.writerIndex, value);
+ wrapped.writerIndex += 4;
+ return this;
+ }
- @Override
- public ByteBuf writeLong(long value) {
- wrapped.ensureWritable(8);
- _setLong(wrapped.writerIndex, value);
- wrapped.writerIndex += 8;
- return this;
- }
+ @Override
+ public ByteBuf writeLong(long value) {
+ wrapped.ensureWritable(8);
+ _setLong(wrapped.writerIndex, value);
+ wrapped.writerIndex += 8;
+ return this;
+ }
- @Override
- public ByteBuf writeChar(int value) {
- writeShort(value);
- return this;
- }
+ @Override
+ public ByteBuf writeChar(int value) {
+ writeShort(value);
+ return this;
+ }
- @Override
- public ByteBuf writeFloat(float value) {
- writeInt(Float.floatToRawIntBits(value));
- return this;
- }
+ @Override
+ public ByteBuf writeFloat(float value) {
+ writeInt(Float.floatToRawIntBits(value));
+ return this;
+ }
- @Override
- public ByteBuf writeDouble(double value) {
- writeLong(Double.doubleToRawLongBits(value));
- return this;
- }
+ @Override
+ public ByteBuf writeDouble(double value) {
+ writeLong(Double.doubleToRawLongBits(value));
+ return this;
+ }
- private void _setShort(int index, int value) {
- PlatformDependent.putShort(addr(index), (short) value);
- }
+ private void _setShort(int index, int value) {
+ PlatformDependent.putShort(addr(index), (short) value);
+ }
- private void _setInt(int index, int value) {
- PlatformDependent.putInt(addr(index), value);
- }
+ private void _setInt(int index, int value) {
+ PlatformDependent.putInt(addr(index), value);
+ }
- private void _setLong(int index, long value) {
- PlatformDependent.putLong(addr(index), value);
- }
+ private void _setLong(int index, long value) {
+ PlatformDependent.putLong(addr(index), value);
+ }
- @Override
- public byte getByte(int index) {
- return PlatformDependent.getByte(addr(index));
- }
+ @Override
+ public byte getByte(int index) {
+ return PlatformDependent.getByte(addr(index));
+ }
- @Override
- public ByteBuf setByte(int index, int value) {
- PlatformDependent.putByte(addr(index), (byte) value);
- return this;
- }
+ @Override
+ public ByteBuf setByte(int index, int value) {
+ PlatformDependent.putByte(addr(index), (byte) value);
+ return this;
+ }
@Override
public boolean release() {
@@ -291,8 +243,8 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
@Override
public boolean release(int decrement) {
- boolean released = super.release(decrement);
- if (released && initCap != -1) {
+ final boolean released = super.release(decrement);
+ if (ASSERT_ENABLED && released && bufferCount != null && bufferSize != null) {
bufferCount.decrementAndGet();
bufferSize.addAndGet(-initCap);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountant.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountant.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountant.java
new file mode 100644
index 0000000..8bcf6a0
--- /dev/null
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountant.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.apache.drill.exec.exception.OutOfMemoryException;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Provides a concurrent way to manage account for memory usage without locking. Used as basis for Allocators. All
+ * operations are threadsafe (except for close).
+ */
+@ThreadSafe
+class Accountant implements AutoCloseable {
+ // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Accountant.class);
+
+ /**
+ * The parent allocator
+ */
+ protected final Accountant parent;
+
+ /**
+ * The amount of memory reserved for this allocator. Releases below this amount of memory will not be returned to the
+ * parent Accountant until this Accountant is closed.
+ */
+ protected final long reservation;
+
+ private final AtomicLong peakAllocation = new AtomicLong();
+
+ /**
+ * Maximum local memory that can be held. This can be externally updated. Changing it won't cause past memory to
+ * change but will change responses to future allocation efforts
+ */
+ private final AtomicLong allocationLimit = new AtomicLong();
+
+ /**
+ * Currently allocated amount of memory;
+ */
+ private final AtomicLong locallyHeldMemory = new AtomicLong();
+
+ public Accountant(Accountant parent, long reservation, long maxAllocation) {
+ Preconditions.checkArgument(reservation >= 0, "The initial reservation size must be non-negative.");
+ Preconditions.checkArgument(maxAllocation >= 0, "The maximum allocation limit must be non-negative.");
+ Preconditions.checkArgument(reservation <= maxAllocation,
+ "The initial reservation size must be <= the maximum allocation.");
+ Preconditions.checkArgument(reservation == 0 || parent != null, "The root accountant can't reserve memory.");
+
+ this.parent = parent;
+ this.reservation = reservation;
+ this.allocationLimit.set(maxAllocation);
+
+ if (reservation != 0) {
+ // we will allocate a reservation from our parent.
+ final AllocationOutcome outcome = parent.allocateBytes(reservation);
+ if (!outcome.isOk()) {
+ throw new OutOfMemoryException(String.format(
+ "Failure trying to allocate initial reservation for Allocator. "
+ + "Attempted to allocate %d bytes and received an outcome of %s.", reservation, outcome.name()));
+ }
+ }
+ }
+
+ /**
+ * Attempt to allocate the requested amount of memory. Either completely succeeds or completely fails. Constructs a a
+ * log of delta
+ *
+ * If it fails, no changes are made to accounting.
+ *
+ * @param size
+ * The amount of memory to reserve in bytes.
+ * @return True if the allocation was successful, false if the allocation failed.
+ */
+ AllocationOutcome allocateBytes(long size) {
+ final AllocationOutcome outcome = allocate(size, true, false);
+ if (!outcome.isOk()) {
+ releaseBytes(size);
+ }
+ return outcome;
+ }
+
+ private void updatePeak() {
+ final long currentMemory = locallyHeldMemory.get();
+ while (true) {
+
+ final long previousPeak = peakAllocation.get();
+ if (currentMemory > previousPeak) {
+ if (!peakAllocation.compareAndSet(previousPeak, currentMemory)) {
+ // peak allocation changed underneath us. try again.
+ continue;
+ }
+ }
+
+ // we either succeeded to set peak allocation or we weren't above the previous peak, exit.
+ return;
+ }
+ }
+
+
+ /**
+ * Increase the accounting. Returns whether the allocation fit within limits.
+ *
+ * @param size
+ * to increase
+ * @return Whether the allocation fit within limits.
+ */
+ boolean forceAllocate(long size) {
+ final AllocationOutcome outcome = allocate(size, true, true);
+ return outcome.isOk();
+ }
+
+ /**
+ * Internal method for allocation. This takes a forced approach to allocation to ensure that we manage reservation
+ * boundary issues consistently. Allocation is always done through the entire tree. The two options that we influence
+ * are whether the allocation should be forced and whether or not the peak memory allocation should be updated. If at
+ * some point during allocation escalation we determine that the allocation is no longer possible, we will continue to
+ * do a complete and consistent allocation but we will stop updating the peak allocation. We do this because we know
+ * that we will be directly unwinding this allocation (and thus never actually making the allocation). If force
+ * allocation is passed, then we continue to update the peak limits since we now know that this allocation will occur
+ * despite our moving past one or more limits.
+ *
+ * @param size
+ * The size of the allocation.
+ * @param incomingUpdatePeak
+ * Whether we should update the local peak for this allocation.
+ * @param forceAllocation
+ * Whether we should force the allocation.
+ * @return The outcome of the allocation.
+ */
+ private AllocationOutcome allocate(final long size, final boolean incomingUpdatePeak, final boolean forceAllocation) {
+ final long newLocal = locallyHeldMemory.addAndGet(size);
+ final long beyondReservation = newLocal - reservation;
+ final boolean beyondLimit = newLocal > allocationLimit.get();
+ final boolean updatePeak = forceAllocation || (incomingUpdatePeak && !beyondLimit);
+
+ AllocationOutcome parentOutcome = AllocationOutcome.SUCCESS;
+ if (beyondReservation > 0 && parent != null) {
+ // we need to get memory from our parent.
+ final long parentRequest = Math.min(beyondReservation, size);
+ parentOutcome = parent.allocate(parentRequest, updatePeak, forceAllocation);
+ }
+
+ final AllocationOutcome finalOutcome = beyondLimit ? AllocationOutcome.FAILED_LOCAL :
+ parentOutcome.ok ? AllocationOutcome.SUCCESS : AllocationOutcome.FAILED_PARENT;
+
+ if (updatePeak) {
+ updatePeak();
+ }
+
+ return finalOutcome;
+ }
+
+ public void releaseBytes(long size) {
+ // reduce local memory. all memory released above reservation should be released up the tree.
+ final long newSize = locallyHeldMemory.addAndGet(-size);
+
+ Preconditions.checkArgument(newSize >= 0, "Accounted size went negative.");
+
+ final long originalSize = newSize + size;
+ if(originalSize > reservation && parent != null){
+ // we deallocated memory that we should release to our parent.
+ final long possibleAmountToReleaseToParent = originalSize - reservation;
+ final long actualToReleaseToParent = Math.min(size, possibleAmountToReleaseToParent);
+ parent.releaseBytes(actualToReleaseToParent);
+ }
+
+ }
+
+ /**
+ * Set the maximum amount of memory that can be allocated in the this Accountant before failing an allocation.
+ *
+ * @param newLimit
+ * The limit in bytes.
+ */
+ public void setLimit(long newLimit) {
+ allocationLimit.set(newLimit);
+ }
+
+ public boolean isOverLimit() {
+ return getAllocatedMemory() > getLimit() || (parent != null && parent.isOverLimit());
+ }
+
+ /**
+ * Close this Accountant. This will release any reservation bytes back to a parent Accountant.
+ */
+ public void close() {
+ // return memory reservation to parent allocator.
+ if (parent != null) {
+ parent.releaseBytes(reservation);
+ }
+ }
+
+ /**
+ * Return the current limit of this Accountant.
+ *
+ * @return Limit in bytes.
+ */
+ public long getLimit() {
+ return allocationLimit.get();
+ }
+
+ /**
+ * Return the current amount of allocated memory that this Accountant is managing accounting for. Note this does not
+ * include reservation memory that hasn't been allocated.
+ *
+ * @return Currently allocate memory in bytes.
+ */
+ public long getAllocatedMemory() {
+ return locallyHeldMemory.get();
+ }
+
+ /**
+ * The peak memory allocated by this Accountant.
+ *
+ * @return The peak allocated memory in bytes.
+ */
+ public long getPeakMemoryAllocation() {
+ return peakAllocation.get();
+ }
+
+ /**
+ * Describes the type of outcome that occurred when trying to account for allocation of memory.
+ */
+ public static enum AllocationOutcome {
+
+ /**
+ * Allocation succeeded.
+ */
+ SUCCESS(true),
+
+ /**
+ * Allocation succeeded but only because the allocator was forced to move beyond a limit.
+ */
+ FORCED_SUCESS(true),
+
+ /**
+ * Allocation failed because the local allocator's limits were exceeded.
+ */
+ FAILED_LOCAL(false),
+
+ /**
+ * Allocation failed because a parent allocator's limits were exceeded.
+ */
+ FAILED_PARENT(false);
+
+ private final boolean ok;
+
+ AllocationOutcome(boolean ok) {
+ this.ok = ok;
+ }
+
+ public boolean isOk() {
+ return ok;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountor.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountor.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountor.java
deleted file mode 100644
index 7014a0b..0000000
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountor.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-import io.netty.buffer.DrillBuf;
-
-public interface Accountor extends AutoCloseable{
-
- public boolean transferTo(Accountor target, DrillBuf buf, long size);
- public boolean transferIn(DrillBuf buf, long size);
- public long getAvailable();
- public long getCapacity();
- public long getAllocation();
- public long getPeakMemoryAllocation();
-
- public boolean reserve(long size);
- public boolean forceAdditionalReservation(long size);
-
- public void reserved(long expected, DrillBuf buf);
-
- public void release(DrillBuf buf, long size);
- public void releasePartial(DrillBuf buf, long size);
- public long resetFragmentLimits();
- public void close();
-
- public void setFragmentLimit(long add);
- public long getFragmentLimit();
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java
new file mode 100644
index 0000000..65a1386
--- /dev/null
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Supports cumulative allocation reservation. Clients may increase the size of the reservation repeatedly until they
+ * call for an allocation of the current total size. The reservation can only be used once, and will throw an exception
+ * if it is used more than once.
+ * <p>
+ * For the purposes of airtight memory accounting, the reservation must be close()d whether it is used or not.
+ * This is not threadsafe.
+ */
+public interface AllocationReservation extends AutoCloseable {
+
+ /**
+ * Add to the current reservation.
+ *
+ * <p>Adding may fail if the allocator is not allowed to consume any more space.
+ *
+ * @param nBytes the number of bytes to add
+ * @return true if the addition is possible, false otherwise
+ * @throws IllegalStateException if called after buffer() is used to allocate the reservation
+ */
+ boolean add(final int nBytes);
+
+ /**
+ * Requests a reservation of additional space.
+ *
+ * <p>The implementation of the allocator's inner class provides this.
+ *
+ * @param nBytes the amount to reserve
+ * @return true if the reservation can be satisfied, false otherwise
+ */
+ boolean reserve(int nBytes);
+
+ /**
+ * Allocate a buffer whose size is the total of all the add()s made.
+ *
+ * <p>The allocation request can still fail, even if the amount of space
+ * requested is available, if the allocation cannot be made contiguously.
+ *
+ * @return the buffer, or null, if the request cannot be satisfied
+ * @throws IllegalStateException if called called more than once
+ */
+ DrillBuf allocateBuffer();
+
+ /**
+ * Get the current size of the reservation (the sum of all the add()s).
+ *
+ * @return size of the current reservation
+ */
+ int getSize();
+
+ /**
+ * Return whether or not the reservation has been used.
+ *
+ * @return whether or not the reservation has been used
+ */
+ public boolean isUsed();
+
+ /**
+ * Return whether or not the reservation has been closed.
+ *
+ * @return whether or not the reservation has been closed
+ */
+ public boolean isClosed();
+
+ public void close();
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java
new file mode 100644
index 0000000..8bf2a99
--- /dev/null
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+/**
+ * Exception thrown when a closed BufferAllocator is used. Note
+ * this is an unchecked exception.
+ *
+ * @param message string associated with the cause
+ */
+@SuppressWarnings("serial")
+public class AllocatorClosedException extends RuntimeException {
+ public AllocatorClosedException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorManager.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorManager.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorManager.java
new file mode 100644
index 0000000..5142806
--- /dev/null
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorManager.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import static org.apache.drill.exec.memory.BaseAllocator.indent;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.PooledByteBufAllocatorL;
+import io.netty.buffer.UnsafeDirectLittleEndian;
+
+import java.util.IdentityHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.drill.common.HistoricalLog;
+import org.apache.drill.common.concurrent.AutoCloseableLock;
+import org.apache.drill.exec.memory.BaseAllocator.Verbosity;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.ops.BufferManager;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Manages the relationship between one or more allocators and a particular UDLE. Ensures that one allocator owns the
+ * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its associated allocators.
+ * This class is also responsible for managing when memory is allocated and returned to the Netty-based
+ * PooledByteBufAllocatorL.
+ *
+ * The only reason that this isn't package private is we're forced to put DrillBuf in Netty's package which need access
+ * to these objects or methods.
+ *
+ * Threading: AllocatorManager manages thread-safety internally. Operations within the context of a single BufferLedger
+ * are lockless in nature and can be leveraged by multiple threads. Operations that cross the context of two ledgers
+ * will acquire a lock on the AllocatorManager instance. Important note, there is one AllocatorManager per
+ * UnsafeDirectLittleEndian buffer allocation. As such, there will be thousands of these in a typical query. The
+ * contention of acquiring a lock on AllocatorManager should be very low.
+ *
+ */
+public class AllocatorManager {
+ // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AllocatorManager.class);
+
+ private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0);
+ static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL(DrillMetrics.getInstance());
+
+ private final RootAllocator root;
+ private volatile BufferLedger owningLedger;
+ private final int size;
+ private final UnsafeDirectLittleEndian underlying;
+ private final IdentityHashMap<BufferAllocator, BufferLedger> map = new IdentityHashMap<>();
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final AutoCloseableLock readLock = new AutoCloseableLock(lock.readLock());
+ private final AutoCloseableLock writeLock = new AutoCloseableLock(lock.writeLock());
+ private final IdentityHashMap<DrillBuf, Object> buffers =
+ BaseAllocator.DEBUG ? new IdentityHashMap<DrillBuf, Object>() : null;
+
+ AllocatorManager(BaseAllocator accountingAllocator, int size) {
+ Preconditions.checkNotNull(accountingAllocator);
+ this.root = accountingAllocator.root;
+ this.underlying = INNER_ALLOCATOR.allocate(size);
+ this.owningLedger = associate(accountingAllocator);
+ this.size = underlying.capacity();
+ }
+
+ /**
+ * Associate the existing underlying buffer with a new allocator.
+ *
+ * @param allocator
+ * The target allocator to associate this buffer with.
+ * @return The Ledger (new or existing) that associates the underlying buffer to this new ledger.
+ */
+ public BufferLedger associate(final BaseAllocator allocator) {
+ if (root != allocator.root) {
+ throw new IllegalStateException(
+ "A buffer can only be associated between two allocators that share the same root.");
+ }
+
+ try (AutoCloseableLock read = readLock.open()) {
+
+ final BufferLedger ledger = map.get(allocator);
+ if (ledger != null) {
+ return ledger;
+ }
+
+ }
+ try (AutoCloseableLock write = writeLock.open()) {
+ final BufferLedger ledger = new BufferLedger(allocator, new ReleaseListener(allocator));
+ map.put(allocator, ledger);
+ allocator.associateLedger(ledger);
+ return ledger;
+ }
+ }
+
+
+ /**
+ * The way that a particular BufferLedger communicates back to the AllocatorManager that it now longer needs to hold a
+ * reference to particular piece of memory.
+ */
+ private class ReleaseListener {
+
+ private final BufferAllocator allocator;
+
+ public ReleaseListener(BufferAllocator allocator) {
+ this.allocator = allocator;
+ }
+
+ public void release() {
+ try (AutoCloseableLock write = writeLock.open()) {
+ final BufferLedger oldLedger = map.remove(allocator);
+ oldLedger.allocator.dissociateLedger(oldLedger);
+
+ if (oldLedger == owningLedger) {
+ if (map.isEmpty()) {
+ // no one else owns, lets release.
+ oldLedger.allocator.releaseBytes(size);
+ underlying.release();
+ } else {
+ // we need to change the owning allocator. we've been removed so we'll get whatever is top of list
+ BufferLedger newLedger = map.values().iterator().next();
+
+ // we'll forcefully transfer the ownership and not worry about whether we exceeded the limit
+ // since this consumer can do anything with this.
+ oldLedger.transferBalance(newLedger);
+ }
+ }
+
+
+ }
+ }
+ }
+
+ /**
+ * The reference manager that binds an allocator manager to a particular BaseAllocator. Also responsible for creating
+ * a set of DrillBufs that share a common fate and set of reference counts.
+ *
+ * As with AllocatorManager, the only reason this is public is due to DrillBuf being in io.netty.buffer package.
+ */
+ public class BufferLedger {
+ private final long id = LEDGER_ID_GENERATOR.incrementAndGet(); // unique ID assigned to each ledger
+ private final AtomicInteger bufRefCnt = new AtomicInteger(0); // start at zero so we can manage request for retain
+ // correctly
+ private final BaseAllocator allocator;
+ private final ReleaseListener listener;
+ private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH,
+ "BufferLedger[%d]", 1)
+ : null;
+
+ private BufferLedger(BaseAllocator allocator, ReleaseListener listener) {
+ this.allocator = allocator;
+ this.listener = listener;
+ }
+
+ /**
+ * Transfer any balance the current ledger has to the target ledger. In the case that the current ledger holds no
+ * memory, no transfer is made to the new ledger.
+ *
+ * @param target
+ * The ledger to transfer ownership account to.
+ * @return Whether transfer fit within target ledgers limits.
+ */
+ public boolean transferBalance(BufferLedger target) {
+ Preconditions.checkNotNull(target);
+ Preconditions.checkArgument(allocator.root == target.allocator.root,
+ "You can only transfer between two allocators that share the same root.");
+
+ // since two balance transfers out from the allocator manager could cause incorrect accounting, we need to ensure
+ // that this won't happen by synchronizing on the allocator manager instance.
+ synchronized (AllocatorManager.this) {
+ if (this != owningLedger || target == this) {
+ return true;
+ }
+
+ if (BaseAllocator.DEBUG) {
+ this.historicalLog.recordEvent("transferBalance(%s)", target.allocator.name);
+ target.historicalLog.recordEvent("incoming(from %s)", owningLedger.allocator.name);
+ }
+
+ boolean overlimit = target.allocator.forceAllocate(size);
+ allocator.releaseBytes(size);
+ owningLedger = target;
+ return overlimit;
+ }
+
+ }
+
+ /**
+ * Print the current ledger state to a the provided StringBuilder.
+ *
+ * @param sb
+ * The StringBuilder to populate.
+ * @param indent
+ * The level of indentation to position the data.
+ * @param verbosity
+ * The level of verbosity to print.
+ */
+ public void print(StringBuilder sb, int indent, Verbosity verbosity) {
+ indent(sb, indent)
+ .append("ledger (allocator: ")
+ .append(allocator.name)
+ .append("), isOwning: ")
+ .append(owningLedger == this)
+ .append(", size: ")
+ .append(size)
+ .append(", references: ")
+ .append(bufRefCnt.get())
+ .append('\n');
+
+ if (BaseAllocator.DEBUG) {
+ synchronized (buffers) {
+ indent(sb, indent + 1).append("BufferLedger[" + id + "] holds ").append(buffers.size())
+ .append(" buffers. \n");
+ for (DrillBuf buf : buffers.keySet()) {
+ buf.print(sb, indent + 2, verbosity);
+ sb.append('\n');
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Release this ledger. This means that all reference counts associated with this ledger are no longer used. This
+ * will inform the AllocatorManager to make a decision about how to manage any memory owned by this particular
+ * BufferLedger
+ */
+ public void release() {
+ listener.release();
+ }
+
+ /**
+ * Returns the ledger associated with a particular BufferAllocator. If the BufferAllocator doesn't currently have a
+ * ledger associated with this AllocatorManager, a new one is created. This is placed on BufferLedger rather than
+ * AllocatorManager direclty because DrillBufs don't have access to AllocatorManager and they are the ones
+ * responsible for exposing the ability to associate mutliple allocators with a particular piece of underlying
+ * memory.
+ *
+ * @param allocator
+ * @return
+ */
+ public BufferLedger getLedgerForAllocator(BufferAllocator allocator) {
+ return associate((BaseAllocator) allocator);
+ }
+
+ /**
+ * Create a new DrillBuf associated with this AllocatorManager and memory. Does not impact reference count.
+ * Typically used for slicing.
+ * @param offset
+ * The offset in bytes to start this new DrillBuf.
+ * @param length
+ * The length in bytes that this DrillBuf will provide access to.
+ * @return A new DrillBuf that shares references with all DrillBufs associated with this BufferLedger
+ */
+ public DrillBuf newDrillBuf(int offset, int length) {
+ return newDrillBuf(offset, length, null, false);
+ }
+
+ /**
+ * Create a new DrillBuf associated with this AllocatorManager and memory.
+ * @param offset
+ * The offset in bytes to start this new DrillBuf.
+ * @param length
+ * The length in bytes that this DrillBuf will provide access to.
+ * @param manager
+ * An optional BufferManager argument that can be used to manage expansion of this DrillBuf
+ * @param retain
+ * Whether or not the newly created buffer should get an additional reference count added to it.
+ * @return A new DrillBuf that shares references with all DrillBufs associated with this BufferLedger
+ */
+ public DrillBuf newDrillBuf(int offset, int length, BufferManager manager, boolean retain) {
+ final DrillBuf buf = new DrillBuf(
+ bufRefCnt,
+ this,
+ underlying,
+ manager,
+ allocator.getAsByteBufAllocator(),
+ offset,
+ length,
+ false);
+
+ if (retain) {
+ buf.retain();
+ }
+
+ if (BaseAllocator.DEBUG) {
+ historicalLog.recordEvent(
+ "DrillBuf(BufferLedger, BufferAllocator[%s], UnsafeDirectLittleEndian[identityHashCode == "
+ + "%d](%s)) => ledger hc == %d",
+ allocator.name, System.identityHashCode(buf), buf.toString(),
+ System.identityHashCode(this));
+
+ synchronized (buffers) {
+ buffers.put(buf, null);
+ }
+ }
+
+ return buf;
+
+ }
+
+ /**
+ * What is the total size (in bytes) of memory underlying this ledger.
+ *
+ * @return Size in bytes
+ */
+ public int getSize() {
+ return size;
+ }
+
+ /**
+ * How much memory is accounted for by this ledger. This is either getSize() if this is the owning ledger for the
+ * memory or zero in the case that this is not the owning ledger associated with this memory.
+ *
+ * @return Amount of accounted(owned) memory associated with this ledger.
+ */
+ public int getAccountedSize() {
+ try (AutoCloseableLock read = readLock.open()) {
+ if (owningLedger == this) {
+ return size;
+ } else {
+ return 0;
+ }
+ }
+ }
+
+ /**
+ * Package visible for debugging/verification only.
+ */
+ UnsafeDirectLittleEndian getUnderlying() {
+ return underlying;
+ }
+
+ /**
+ * Package visible for debugging/verification only.
+ */
+ boolean isOwningLedger() {
+ return this == owningLedger;
+ }
+
+ }
+
+}
\ No newline at end of file