You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/12/22 16:06:27 UTC
[04/13] drill git commit: DRILL-4134: Add new allocator
http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
index b85502b..d244b26 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
@@ -27,28 +27,40 @@ import java.nio.ByteOrder;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.drill.common.HistoricalLog;
import org.apache.drill.exec.memory.Accountor;
-import org.apache.drill.exec.memory.BoundsChecking;
+import org.apache.drill.exec.memory.BaseAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.BufferManager;
-
+import org.apache.drill.exec.memory.BufferLedger;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.util.AssertionUtil;
+import org.apache.drill.exec.util.Pointer;
+import org.slf4j.Logger;
+
+import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillBuf.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillBuf.class);
- private static final boolean BOUNDS_CHECKING_ENABLED = BoundsChecking.BOUNDS_CHECKING_ENABLED;
+ private static final boolean BOUNDS_CHECKING_ENABLED = AssertionUtil.BOUNDS_CHECKING_ENABLED;
+ private static final boolean DEBUG = BaseAllocator.isDebug();
+ private static final AtomicInteger idGenerator = new AtomicInteger(0);
- private final ByteBuf b;
+ private final ByteBuf byteBuf;
private final long addr;
private final int offset;
- private final boolean rootBuffer;
- private final AtomicLong rootRefCnt = new AtomicLong(1);
+ private final int flags;
+ private final AtomicInteger rootRefCnt;
private volatile BufferAllocator allocator;
- private volatile Accountor acct;
- private volatile int length;
// TODO - cleanup
// The code is partly shared and partly copy-pasted between
@@ -56,28 +68,153 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
// to share code and to remove the hacky code here to use only
// one of these types at a time and use null checks to find out
// which.
+ private final boolean oldWorld; // Indicates that we're operating with TopLevelAllocator.
+ private final boolean rootBuffer;
+ private volatile Accountor acct;
private BufferManager bufManager;
+ @Deprecated private OperatorContext operatorContext;
+ @Deprecated private FragmentContext fragmentContext;
+
+ private volatile BufferLedger bufferLedger;
+ private volatile int length; // TODO this just seems to duplicate .capacity()
+
+ // members used purely for debugging
+ // TODO once we have a reduced number of constructors, move these to DEBUG clauses in them
+ private final int id = idGenerator.incrementAndGet();
+ private final HistoricalLog historicalLog = DEBUG ? new HistoricalLog(4, "DrillBuf[%d]", id) : null;
+ private final static IdentityHashMap<UnsafeDirectLittleEndian, Collection<DrillBuf>> unwrappedMap =
+ DEBUG ? new IdentityHashMap<UnsafeDirectLittleEndian, Collection<DrillBuf>>() : null;
+
+ // TODO(cwestin) javadoc
+ private void unwrappedPut() {
+ final UnsafeDirectLittleEndian udle = (UnsafeDirectLittleEndian) byteBuf;
+ synchronized(unwrappedMap) {
+ Collection<DrillBuf> drillBufs = unwrappedMap.get(udle);
+ if (drillBufs == null) {
+ drillBufs = new LinkedList<DrillBuf>();
+ unwrappedMap.put(udle, drillBufs);
+ }
+
+ drillBufs.add(this);
+ }
+ }
+
+ // TODO(cwestin) javadoc
+ public static Collection<DrillBuf> unwrappedGet(final UnsafeDirectLittleEndian udle) {
+ synchronized(unwrappedMap) {
+ final Collection<DrillBuf> drillBufs = unwrappedMap.get(udle);
+ if (drillBufs == null) {
+ return Collections.emptyList();
+ }
+ return new LinkedList<DrillBuf>(drillBufs);
+ }
+ }
+
+ // TODO(cwestin) javadoc
+ private static boolean unwrappedRemove(final DrillBuf drillBuf) {
+ final ByteBuf byteBuf = drillBuf.unwrap();
+ if (!(byteBuf instanceof UnsafeDirectLittleEndian)) {
+ return false;
+ }
+
+ final UnsafeDirectLittleEndian udle = (UnsafeDirectLittleEndian) byteBuf;
+ synchronized(unwrappedMap) {
+ Collection<DrillBuf> drillBufs = unwrappedMap.get(udle);
+ if (drillBufs == null) {
+ return false;
+ }
+ final Object object = drillBufs.remove(drillBuf);
+ if (drillBufs.isEmpty()) {
+ unwrappedMap.remove(udle);
+ }
+ return object != null;
+ }
+ }
public DrillBuf(BufferAllocator allocator, Accountor a, UnsafeDirectLittleEndian b) {
super(b.maxCapacity());
- this.b = b;
+ this.byteBuf = b;
this.addr = b.memoryAddress();
this.acct = a;
this.length = b.capacity();
this.offset = 0;
this.rootBuffer = true;
this.allocator = allocator;
+
+ // members from the new world order
+ flags = 0;
+ rootRefCnt = null;
+ oldWorld = true;
+ }
+
+ // TODO(cwestin) javadoc
+ public DrillBuf(final BufferLedger bufferLedger, final BufferAllocator bufferAllocator,
+ final UnsafeDirectLittleEndian byteBuf) {
+ super(byteBuf.maxCapacity());
+ this.byteBuf = byteBuf;
+ byteBuf.retain(1);
+ this.bufferLedger = bufferLedger;
+ addr = byteBuf.memoryAddress();
+ allocator = bufferAllocator;
+ length = byteBuf.capacity();
+ offset = 0;
+ flags = 0;
+ rootRefCnt = new AtomicInteger(1);
+ oldWorld = false;
+
+ // members from the old world order
+ rootBuffer = false;
+ acct = null;
+
+ if (DEBUG) {
+ unwrappedPut();
+ historicalLog.recordEvent(
+ "DrillBuf(BufferLedger, BufferAllocator[%d], UnsafeDirectLittleEndian[identityHashCode == "
+ + "%d](%s)) => rootRefCnt identityHashCode == %d",
+ bufferAllocator.getId(), System.identityHashCode(byteBuf), byteBuf.toString(),
+ System.identityHashCode(rootRefCnt));
+ }
}
private DrillBuf(BufferAllocator allocator, Accountor a) {
super(0);
- this.b = new EmptyByteBuf(allocator.getUnderlyingAllocator()).order(ByteOrder.LITTLE_ENDIAN);
+ this.byteBuf = new EmptyByteBuf(allocator.getUnderlyingAllocator()).order(ByteOrder.LITTLE_ENDIAN);
this.allocator = allocator;
this.acct = a;
this.length = 0;
this.addr = 0;
this.rootBuffer = false;
this.offset = 0;
+
+ // members from the new world order
+ flags = 0;
+ rootRefCnt = null;
+ oldWorld = true;
+ }
+
+ private DrillBuf(final BufferLedger bufferLedger, final BufferAllocator bufferAllocator) {
+ super(0);
+ this.bufferLedger = bufferLedger;
+ allocator = bufferAllocator;
+
+ byteBuf = new EmptyByteBuf(bufferLedger.getUnderlyingAllocator()).order(ByteOrder.LITTLE_ENDIAN);
+ length = 0;
+ addr = 0;
+ flags = 0;
+ rootRefCnt = new AtomicInteger(1);
+ offset = 0;
+
+ // members from the old world order
+ rootBuffer = false;
+ acct = null;
+ oldWorld = false;
+
+ if (DEBUG) {
+ // We don't put the empty buffers in the unwrappedMap.
+ historicalLog.recordEvent(
+ "DrillBuf(BufferLedger, BufferAllocator[%d]) => rootRefCnt identityHashCode == %d",
+ bufferAllocator.getId(), System.identityHashCode(rootRefCnt));
+ }
}
/**
@@ -96,7 +233,6 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
b.unwrap().unwrap().retain();
}
-
private DrillBuf(DrillBuf buffer, int index, int length) {
this(buffer.allocator, null, buffer, buffer, index, length, false);
}
@@ -105,6 +241,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
ByteBuf underlying = b.unwrap().unwrap();
return underlying.slice((int) (b.memoryAddress() - underlying.memoryAddress()), b.length);
}
+
private DrillBuf(BufferAllocator allocator, Accountor a, ByteBuf replacement, DrillBuf buffer, int index, int length, boolean root) {
super(length);
if (index < 0 || index > buffer.capacity() - length) {
@@ -114,16 +251,138 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
this.length = length;
writerIndex(length);
- this.b = replacement;
+ this.byteBuf = replacement;
this.addr = buffer.memoryAddress() + index;
this.offset = index;
this.acct = a;
this.length = length;
this.rootBuffer = root;
this.allocator = allocator;
+
+ // members from the new world order
+ flags = 0;
+ rootRefCnt = null;
+ oldWorld = true;
}
+ /**
+ * Indicate a shared refcount, as per http://netty.io/wiki/reference-counted-objects.html#wiki-h3-5
+ */
+ private final static int F_DERIVED = 0x0002;
+
+ // TODO(cwestin) javadoc
+ /**
+ * Used for sharing.
+ *
+ * @param bufferLedger
+ * @param bufferAllocator
+ * @param originalBuf
+ * @param index
+ * @param length
+ * @param flags
+ */
+ public DrillBuf(final BufferLedger bufferLedger, final BufferAllocator bufferAllocator,
+ final DrillBuf originalBuf, final int index, final int length, final int flags) {
+ this(bufferAllocator, bufferLedger, getUnderlyingUdle(originalBuf),
+ originalBuf, index + originalBuf.offset, length, flags);
+ }
+
+ /**
+ * Unwraps a DrillBuf until the underlying UnsafeDirectLittleEndian buffer is
+ * found.
+ *
+ * @param originalBuf the original DrillBuf
+ * @return the underlying UnsafeDirectLittleEndian ByteBuf
+ */
+ private static ByteBuf getUnderlyingUdle(final DrillBuf originalBuf) {
+ int count = 1;
+ ByteBuf unwrapped = originalBuf.unwrap();
+ while(!(unwrapped instanceof UnsafeDirectLittleEndian)
+ && (!(unwrapped instanceof EmptyByteBuf))) {
+ unwrapped = unwrapped.unwrap();
+ ++count;
+ }
+
+ if (DEBUG) {
+ if (count > 1) {
+ throw new IllegalStateException("UnsafeDirectLittleEndian is wrapped more than one level");
+ }
+ }
+
+ return unwrapped;
+ }
+
+ // TODO(cwestin) javadoc
+ /*
+ * TODO the replacement argument becomes an UnsafeDirectLittleEndian;
+ * buffer argument may go away if it is determined to be unnecessary after all
+ * the deprecated stuff is removed (I suspect only the replacement argument is
+ * necessary then).
+ */
+ private DrillBuf(BufferAllocator allocator, BufferLedger bufferLedger,
+ ByteBuf replacement, DrillBuf buffer, int index, int length, int flags) {
+ super(replacement.maxCapacity());
+
+ // members from the old world order
+ rootBuffer = false;
+ acct = null;
+ oldWorld = false;
+
+ if (index < 0 || index > (replacement.maxCapacity() - length)) {
+ throw new IndexOutOfBoundsException(replacement.toString() + ".slice(" + index + ", " + length + ')');
+ }
+
+ this.flags = flags;
+
+ this.length = length; // capacity()
+ writerIndex(length);
+
+ byteBuf = replacement;
+ if ((flags & F_DERIVED) == 0) {
+ replacement.retain(1);
+ }
+
+ addr = replacement.memoryAddress() + index;
+ offset = index;
+ this.bufferLedger = bufferLedger;
+ if (!(buffer instanceof DrillBuf)) {
+ throw new IllegalArgumentException("DrillBuf slicing can only be performed on other DrillBufs");
+ }
+
+ if ((flags & F_DERIVED) != 0) {
+ final DrillBuf rootBuf = (DrillBuf) buffer;
+ rootRefCnt = rootBuf.rootRefCnt;
+ } else {
+ rootRefCnt = new AtomicInteger(1);
+ }
+
+ this.allocator = allocator;
+
+ if (DEBUG) {
+ unwrappedPut();
+ historicalLog.recordEvent(
+ "DrillBuf(BufferAllocator[%d], BufferLedger, ByteBuf[identityHashCode == "
+ + "%d](%s), DrillBuf[%d], index = %d, length = %d, flags = 0x%08x)"
+ + " => rootRefCnt identityHashCode == %d",
+ allocator.getId(), System.identityHashCode(replacement), replacement.toString(),
+ buffer.id, index, length, flags, System.identityHashCode(rootRefCnt));
+ }
+ }
+
+ @Deprecated
+ public void setOperatorContext(OperatorContext c) {
+ this.operatorContext = c;
+ }
+
+ @Deprecated
+ public void setFragmentContext(FragmentContext c) {
+ this.fragmentContext = c;
+ }
+
+ // TODO(DRILL-3331)
public void setBufferManager(BufferManager bufManager) {
+ Preconditions.checkState(this.bufManager == null,
+ "the BufferManager for a buffer can only be set once");
this.bufManager = bufManager;
}
@@ -131,47 +390,50 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
return allocator;
}
- public DrillBuf reallocIfNeeded(int size) {
+ public DrillBuf reallocIfNeeded(final int size) {
+ Preconditions.checkArgument(size >= 0, "reallocation size must be non-negative");
+
if (this.capacity() >= size) {
return this;
}
- if (bufManager != null) {
+ if (operatorContext != null) {
+ return operatorContext.replace(this, size);
+ } else if(fragmentContext != null) {
+ return fragmentContext.replace(this, size);
+ } else if (bufManager != null) {
return bufManager.replace(this, size);
} else {
throw new UnsupportedOperationException("Realloc is only available in the context of an operator's UDFs");
}
-
}
@Override
public int refCnt() {
- if(rootBuffer){
- return (int) this.rootRefCnt.get();
- }else{
- return b.refCnt();
+ if (oldWorld) {
+ if(rootBuffer){
+ return (int) this.rootRefCnt.get();
+ }else{
+ return byteBuf.refCnt();
+ }
}
+ return rootRefCnt.get();
}
private long addr(int index) {
return addr + index;
}
- private final void checkIndexD(int index) {
- ensureAccessible();
- if (index < 0 || index >= capacity()) {
- throw new IndexOutOfBoundsException(String.format(
- "index: %d (expected: range(0, %d))", index, capacity()));
- }
- }
-
private final void checkIndexD(int index, int fieldLength) {
ensureAccessible();
if (fieldLength < 0) {
throw new IllegalArgumentException("length: " + fieldLength + " (expected: >= 0)");
}
if (index < 0 || index > capacity() - fieldLength) {
+ if (DEBUG) {
+ historicalLog.logHistory(logger);
+ }
throw new IndexOutOfBoundsException(String.format(
"index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity()));
}
@@ -186,7 +448,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
* @param start The starting position of the bytes to be read.
* @param end The exclusive endpoint of the bytes to be read.
*/
- public void checkBytes(int start, int end){
+ public void checkBytes(int start, int end) {
if (BOUNDS_CHECKING_ENABLED) {
checkIndexD(start, end - start);
}
@@ -198,18 +460,51 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
}
}
- private void chk(int index) {
- if (BOUNDS_CHECKING_ENABLED) {
- checkIndexD(index);
- }
- }
-
private void ensure(int width) {
if (BOUNDS_CHECKING_ENABLED) {
ensureWritable(width);
}
}
+ /**
+ * Used by allocators to transfer ownership from one allocator to another.
+ *
+ * @param newLedger the new ledger the buffer should use going forward
+ * @param newAllocator the new allocator
+ * @return whether or not the buffer fits the receiving allocator's allocation limit
+ */
+ public boolean transferTo(final BufferAllocator newAllocator, final BufferLedger newLedger) {
+ final Pointer<BufferLedger> pNewLedger = new Pointer<>(newLedger);
+ final boolean fitsAllocation = bufferLedger.transferTo(newAllocator, pNewLedger, this);
+ allocator = newAllocator;
+ bufferLedger = pNewLedger.value;
+ return fitsAllocation;
+ }
+
+ /**
+ * DrillBuf's implementation of sharing buffer functionality, to be accessed from
+ * {@link BufferAllocator#shareOwnership(DrillBuf, Pointer)}. See that function
+ * for more information.
+ *
+ * @param otherLedger the ledger belonging to the other allocator to share with
+ * @param otherAllocator the other allocator to be shared with
+ * @param index the starting index (for slicing capability)
+ * @param length the length (for slicing capability)
+ * @return the new DrillBuf (wrapper)
+ */
+ public DrillBuf shareWith(final BufferLedger otherLedger, final BufferAllocator otherAllocator,
+ final int index, final int length) {
+ return shareWith(otherLedger, otherAllocator, index, length, 0);
+ }
+
+ // TODO(cwestin) javadoc
+ private DrillBuf shareWith(final BufferLedger otherLedger, final BufferAllocator otherAllocator,
+ final int index, final int length, final int flags) {
+ final Pointer<DrillBuf> pDrillBuf = new Pointer<>();
+ bufferLedger = bufferLedger.shareWith(pDrillBuf, otherLedger, otherAllocator, this, index, length, flags);
+ return pDrillBuf.value;
+ }
+
public boolean transferAccounting(Accountor target) {
if (rootBuffer) {
boolean outcome = acct.transferTo(target, this, length);
@@ -221,7 +516,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
}
@Override
- public synchronized boolean release() {
+ public boolean release() {
return release(1);
}
@@ -230,20 +525,44 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
*/
@Override
public synchronized boolean release(int decrement) {
+ Preconditions.checkArgument(decrement > 0,
+ "release(%d) argument is not positive", decrement);
+ if (DEBUG) {
+ historicalLog.recordEvent("release(%d)", decrement);
+ }
- if(rootBuffer){
- final long newRefCnt = this.rootRefCnt.addAndGet(-decrement);
- Preconditions.checkArgument(newRefCnt > -1, "Buffer has negative reference count.");
- if (newRefCnt == 0) {
- b.release(decrement);
- acct.release(this, length);
- return true;
+ if (oldWorld) {
+ if(rootBuffer){
+ final long newRefCnt = this.rootRefCnt.addAndGet(-decrement);
+ Preconditions.checkArgument(newRefCnt > -1, "Buffer has negative reference count.");
+ if (newRefCnt == 0) {
+ byteBuf.release(decrement);
+ acct.release(this, length);
+ return true;
+ }else{
+ return false;
+ }
}else{
- return false;
+ return byteBuf.release(decrement);
}
- }else{
- return b.release(decrement);
}
+
+ final int refCnt = rootRefCnt.addAndGet(-decrement);
+ Preconditions.checkState(refCnt >= 0, "DrillBuf[%d] refCnt has gone negative", id);
+ if (refCnt == 0) {
+ bufferLedger.release(this);
+
+ if (DEBUG) {
+ unwrappedRemove(this);
+ }
+
+ // release the underlying buffer
+ byteBuf.release(1);
+
+ return true;
+ }
+
+ return false;
}
@Override
@@ -253,31 +572,45 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
@Override
public synchronized ByteBuf capacity(int newCapacity) {
- if (rootBuffer) {
- if (newCapacity == length) {
- return this;
- } else if (newCapacity < length) {
- b.capacity(newCapacity);
- int diff = length - b.capacity();
- acct.releasePartial(this, diff);
- this.length = length - diff;
- return this;
+ if (oldWorld) {
+ if (rootBuffer) {
+ if (newCapacity == length) {
+ return this;
+ } else if (newCapacity < length) {
+ byteBuf.capacity(newCapacity);
+ int diff = length - byteBuf.capacity();
+ acct.releasePartial(this, diff);
+ this.length = length - diff;
+ return this;
+ } else {
+ throw new UnsupportedOperationException("Accounting byte buf doesn't support increasing allocations.");
+ }
} else {
- throw new UnsupportedOperationException("Accounting byte buf doesn't support increasing allocations.");
+ throw new UnsupportedOperationException("Non root bufs doen't support changing allocations.");
}
- } else {
- throw new UnsupportedOperationException("Non root bufs doen't support changing allocations.");
}
- }
- @Override
- public int maxCapacity() {
- return length;
+ if ((flags & F_DERIVED) != 0) {
+ throw new UnsupportedOperationException("Derived buffers don't support resizing.");
+ }
+
+ if (newCapacity == length) {
+ return this;
+ }
+
+ if (newCapacity < length) {
+ byteBuf.capacity(newCapacity);
+ final int diff = length - byteBuf.capacity();
+ length -= diff;
+ return this;
+ }
+
+ throw new UnsupportedOperationException("Buffers don't support resizing that increases the size.");
}
@Override
public ByteBufAllocator alloc() {
- return b.alloc();
+ return byteBuf.alloc();
}
@Override
@@ -287,14 +620,12 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
@Override
public ByteBuf order(ByteOrder endianness) {
- // if(endianness != ByteOrder.LITTLE_ENDIAN) throw new
- // UnsupportedOperationException("Drill buffers only support little endian.");
return this;
}
@Override
public ByteBuf unwrap() {
- return b;
+ return byteBuf;
}
@Override
@@ -309,7 +640,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
@Override
public ByteBuf readSlice(int length) {
- ByteBuf slice = slice(readerIndex(), length);
+ final ByteBuf slice = slice(readerIndex(), length);
readerIndex(readerIndex() + length);
return slice;
}
@@ -329,16 +660,42 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
return slice(readerIndex(), readableBytes());
}
+ public static String bufferState(final ByteBuf buf) {
+ final int cap = buf.capacity();
+ final int mcap = buf.maxCapacity();
+ final int ri = buf.readerIndex();
+ final int rb = buf.readableBytes();
+ final int wi = buf.writerIndex();
+ final int wb = buf.writableBytes();
+ return String.format("cap/max: %d/%d, ri: %d, rb: %d, wi: %d, wb: %d",
+ cap, mcap, ri, rb, wi, wb);
+ }
+
@Override
public DrillBuf slice(int index, int length) {
- DrillBuf buf = new DrillBuf(this, index, length);
- buf.writerIndex = length;
+ if (oldWorld) {
+ DrillBuf buf = new DrillBuf(this, index, length);
+ buf.writerIndex = length;
+ return buf;
+ }
+
+ /*
+ * Re the behavior of reference counting,
+ * see http://netty.io/wiki/reference-counted-objects.html#wiki-h3-5, which explains
+ * that derived buffers share their reference count with their parent
+ */
+ final DrillBuf buf = shareWith(bufferLedger, allocator, index, length, F_DERIVED);
+ buf.writerIndex(length);
return buf;
}
@Override
public DrillBuf duplicate() {
- return new DrillBuf(this, 0, length);
+ if (oldWorld) {
+ return new DrillBuf(this, 0, length);
+ }
+
+ return slice(0, length);
}
@Override
@@ -353,12 +710,12 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
@Override
public ByteBuffer nioBuffer(int index, int length) {
- return b.nioBuffer(offset + index, length);
+ return byteBuf.nioBuffer(offset + index, length);
}
@Override
public ByteBuffer internalNioBuffer(int index, int length) {
- return b.internalNioBuffer(offset + index, length);
+ return byteBuf.internalNioBuffer(offset + index, length);
}
@Override
@@ -373,17 +730,17 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
@Override
public boolean hasArray() {
- return b.hasArray();
+ return byteBuf.hasArray();
}
@Override
public byte[] array() {
- return b.array();
+ return byteBuf.array();
}
@Override
public int arrayOffset() {
- return b.arrayOffset();
+ return byteBuf.arrayOffset();
}
@Override
@@ -397,17 +754,26 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
}
@Override
+ public String toString() {
+ return toString(0, 0, Charsets.UTF_8);
+ }
+
+ @Override
public String toString(Charset charset) {
- return toString(readerIndex, readableBytes(), charset);
+ return toString(readerIndex, readableBytes(), charset);
}
@Override
public String toString(int index, int length, Charset charset) {
+ final String basics =
+ String.format("{DrillBuf[%d], udle identityHashCode == %d, rootRefCnt identityHashCode == %d}",
+ id, System.identityHashCode(byteBuf), System.identityHashCode(rootRefCnt));
+
if (length == 0) {
- return "";
+ return basics;
}
- ByteBuffer nioBuffer;
+ final ByteBuffer nioBuffer;
if (nioBufferCount() == 1) {
nioBuffer = nioBuffer(index, length);
} else {
@@ -416,7 +782,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
nioBuffer.flip();
}
- return ByteBufUtil.decodeString(nioBuffer, charset);
+ return basics + '\n' + ByteBufUtil.decodeString(nioBuffer, charset);
}
@Override
@@ -431,12 +797,22 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
}
@Override
- public synchronized ByteBuf retain(int increment) {
- if(rootBuffer){
- this.rootRefCnt.addAndGet(increment);
- }else{
- b.retain(increment);
+ public ByteBuf retain(int increment) {
+ Preconditions.checkArgument(increment > 0, "retain(%d) argument is not positive", increment);
+ if (DEBUG) {
+ historicalLog.recordEvent("retain(%d)", increment);
}
+
+ if (oldWorld) {
+ if(rootBuffer){
+ this.rootRefCnt.addAndGet(increment);
+ }else{
+ byteBuf.retain(increment);
+ }
+ return this;
+ }
+
+ rootRefCnt.addAndGet(increment);
return this;
}
@@ -448,7 +824,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
@Override
public long getLong(int index) {
chk(index, 8);
- long v = PlatformDependent.getLong(addr(index));
+ final long v = PlatformDependent.getLong(addr(index));
return v;
}
@@ -475,7 +851,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
@Override
public int getInt(int index) {
chk(index, 4);
- int v = PlatformDependent.getInt(addr(index));
+ final int v = PlatformDependent.getInt(addr(index));
return v;
}
@@ -583,13 +959,13 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
@Override
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
- b.getBytes(index + offset, dst, dstIndex, length);
+ byteBuf.getBytes(index + offset, dst, dstIndex, length);
return this;
}
@Override
public ByteBuf getBytes(int index, ByteBuffer dst) {
- b.getBytes(index + offset, dst);
+ byteBuf.getBytes(index + offset, dst);
return this;
}
@@ -657,19 +1033,19 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
@Override
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
- b.getBytes(index + offset, dst, dstIndex, length);
+ byteBuf.getBytes(index + offset, dst, dstIndex, length);
return this;
}
@Override
public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
- b.getBytes(index + offset, out, length);
+ byteBuf.getBytes(index + offset, out, length);
return this;
}
@Override
protected int _getUnsignedMedium(int index) {
- long addr = addr(index);
+ final long addr = addr(index);
return (PlatformDependent.getByte(addr) & 0xff) << 16 |
(PlatformDependent.getByte(addr + 1) & 0xff) << 8 |
PlatformDependent.getByte(addr + 2) & 0xff;
@@ -677,12 +1053,12 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
@Override
public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
- return b.getBytes(index + offset, out, length);
+ return byteBuf.getBytes(index + offset, out, length);
}
@Override
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
- b.setBytes(index + offset, src, srcIndex, length);
+ byteBuf.setBytes(index + offset, src, srcIndex, length);
return this;
}
@@ -693,12 +1069,12 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
length);
} else {
if (srcIndex == 0 && src.capacity() == length) {
- b.setBytes(index + offset, src);
+ byteBuf.setBytes(index + offset, src);
} else {
ByteBuffer newBuf = src.duplicate();
newBuf.position(srcIndex);
newBuf.limit(srcIndex + length);
- b.setBytes(index + offset, src);
+ byteBuf.setBytes(index + offset, src);
}
}
@@ -707,24 +1083,24 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
- b.setBytes(index + offset, src, srcIndex, length);
+ byteBuf.setBytes(index + offset, src, srcIndex, length);
return this;
}
@Override
public ByteBuf setBytes(int index, ByteBuffer src) {
- b.setBytes(index + offset, src);
+ byteBuf.setBytes(index + offset, src);
return this;
}
@Override
public int setBytes(int index, InputStream in, int length) throws IOException {
- return b.setBytes(index + offset, in, length);
+ return byteBuf.setBytes(index + offset, in, length);
}
@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
- return b.setBytes(index + offset, in, length);
+ return byteBuf.setBytes(index + offset, in, length);
}
@Override
@@ -737,8 +1113,33 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
return new DrillBuf(allocator, a);
}
+ public static DrillBuf getEmpty(final BufferLedger bufferLedger, final BufferAllocator bufferAllocator) {
+ return new DrillBuf(bufferLedger, bufferAllocator);
+ }
+
+ /**
+ * Find out if this is a "root buffer." This is obsolete terminology
+ * based on the original implementation of DrillBuf, which would layer
+ * DrillBufs on top of other DrillBufs when slicing (or duplicating).
+ * The buffer at the bottom of the layer was the "root buffer." However,
+ * the current implementation flattens such references to always make
+ * DrillBufs that are wrap a single buffer underneath, and slices and
+ * their original source have a shared fate as per
+ * http://netty.io/wiki/reference-counted-objects.html#wiki-h3-5, so
+ * this concept isn't really meaningful anymore. But there are callers
+ * that want to know a buffer's original size, and whether or not it
+ * is "primal" in some sense. Perhaps this just needs a new name that
+ * indicates that the buffer was an "original" and not a slice.
+ *
+ * @return whether or not the buffer is an original
+ */
+ @Deprecated
public boolean isRootBuffer() {
- return rootBuffer;
+ if (oldWorld) {
+ return rootBuffer;
+ }
+
+ return (flags & F_DERIVED) == 0;
}
@Override
@@ -746,4 +1147,72 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
release();
}
+ /**
+ * Indicates whether this DrillBuf and the supplied one have a "shared fate."
+ * Having a "shared fate" indicates that the two DrillBufs share a reference
+ * count, and will both be released at the same time if either of them is
+ * released.
+ * @param otherBuf the other buffer to check against
+ * @return true if the two buffers have a shared fate, false otherwise
+ */
+ public boolean hasSharedFate(final DrillBuf otherBuf) {
+ return rootRefCnt == otherBuf.rootRefCnt;
+ }
+
+ private final static int LOG_BYTES_PER_ROW = 10;
+ /**
+ * Log this buffer's byte contents in the form of a hex dump.
+ *
+ * @param logger where to log to
+ * @param start the starting byte index
+ * @param length how many bytes to log
+ */
+ public void logBytes(final Logger logger, final int start, final int length) {
+ final int roundedStart = (start / LOG_BYTES_PER_ROW) * LOG_BYTES_PER_ROW;
+
+ final StringBuilder sb = new StringBuilder("buffer byte dump\n");
+ int index = roundedStart;
+ for(int nLogged = 0; nLogged < length; nLogged += LOG_BYTES_PER_ROW) {
+ sb.append(String.format(" [%05d-%05d]", index, index + LOG_BYTES_PER_ROW - 1));
+ for(int i = 0; i < LOG_BYTES_PER_ROW; ++i) {
+ try {
+ final byte b = getByte(index++);
+ sb.append(String.format(" 0x%02x", b));
+ } catch(IndexOutOfBoundsException ioob) {
+ sb.append(" <ioob>");
+ }
+ }
+ sb.append('\n');
+ }
+ logger.trace(sb.toString());
+ }
+
+ /**
+ * Get the integer id assigned to this DrillBuf for debugging purposes.
+ *
+ * @return integer id
+ */
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * Log this buffer's history.
+ *
+ * @param logger the logger to use
+ */
+ public void logHistory(final Logger logger) {
+ if (historicalLog == null) {
+ logger.warn("DrillBuf[{}] historicalLog not available", id);
+ } else {
+ historicalLog.logHistory(logger);
+ }
+ }
+
+ public void logHistoryForUdle(final Logger logger, final UnsafeDirectLittleEndian udle) {
+ final Collection<DrillBuf> drillBufs = unwrappedGet(udle);
+ for(final DrillBuf drillBuf : drillBufs) {
+ drillBuf.logHistory(logger);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/base/src/main/java/io/netty/buffer/FakeAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/FakeAllocator.java b/exec/memory/base/src/main/java/io/netty/buffer/FakeAllocator.java
index 53ca91c..b8d0fb2 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/FakeAllocator.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/FakeAllocator.java
@@ -17,10 +17,11 @@
*/
package io.netty.buffer;
-import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.memory.Accountor;
+import org.apache.drill.exec.memory.AllocationReservation;
+import org.apache.drill.exec.memory.AllocatorOwner;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.LimitConsumer;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.util.Pointer;
class FakeAllocator implements BufferAllocator {
@@ -46,7 +47,7 @@ class FakeAllocator implements BufferAllocator {
}
@Override
- public BufferAllocator getChildAllocator(LimitConsumer consumer, long initialReservation, long maximumReservation,
+ public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation, long maximumReservation,
boolean applyFragmentLimit)
throws OutOfMemoryException {
throw new UnsupportedOperationException();
@@ -63,22 +64,12 @@ class FakeAllocator implements BufferAllocator {
}
@Override
- public PreAllocator getNewPreAllocator() {
+ public void setFragmentLimit(long l) {
throw new UnsupportedOperationException();
}
@Override
- public void resetLimits() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void setLimit(long l) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getLimit(){
+ public long getFragmentLimit(){
throw new UnsupportedOperationException();
}
@@ -96,9 +87,10 @@ class FakeAllocator implements BufferAllocator {
return 0;
}
- static class FakeAccountor implements Accountor {
+ static class FakeAccountor extends Accountor {
public FakeAccountor() {
+ super(null, false, null, null, 0, 0, true);
}
@Override
@@ -138,50 +130,38 @@ class FakeAllocator implements BufferAllocator {
@Override
public void releasePartial(DrillBuf buf, long size) {
- throw new UnsupportedOperationException();
- }
- @Override
- public void release(DrillBuf buf, long size) {
- throw new UnsupportedOperationException();
}
@Override
- public void close() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean transferIn(DrillBuf buf, long size) {
- return false;
- }
-
- @Override
- public long getPeakMemoryAllocation() {
- return 0;
- }
+ public void release(DrillBuf buf, long size) {
- @Override
- public long resetFragmentLimits() {
- return 0;
}
@Override
- public void setFragmentLimit(long add) {
- throw new UnsupportedOperationException();
- }
+ public void close() {
- @Override
- public long getFragmentLimit() {
- return 0;
}
+ }
+ @Override
+ public BufferAllocator newChildAllocator(AllocatorOwner allocatorOwner,
+ long initReservation, long maxAllocation, int flags) {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public boolean shareOwnership(DrillBuf buf, Pointer<DrillBuf> bufOut) {
+ throw new UnsupportedOperationException();
}
@Override
- public boolean takeOwnership(DrillBuf buf, Pointer<DrillBuf> bufOut) {
+ public int getId() {
throw new UnsupportedOperationException();
}
+ @Override
+ public AllocationReservation newReservation() {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
index 559f06d..419aef3 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
@@ -27,10 +27,68 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
private final AbstractByteBuf wrapped;
private final long memoryAddress;
+ private static final boolean TRACK_BUFFERS = false;
private AtomicLong bufferCount;
private AtomicLong bufferSize;
private long initCap = -1;
+ private final static IdentityHashMap<UnsafeDirectLittleEndian, StackTrace> bufferMap = new IdentityHashMap<>();
+
+ @Override
+ public boolean release() {
+ return release(1);
+ }
+
+ @Override
+ public boolean release(int decrement) {
+ boolean released = super.release(decrement);
+ if (TRACK_BUFFERS) {
+ if (released) {
+ final Object object;
+ synchronized (bufferMap) {
+ object = bufferMap.remove(this);
+ }
+ if (object == null) {
+ throw new IllegalStateException("no such buffer");
+ }
+
+ if (initCap != -1) {
+ bufferCount.decrementAndGet();
+ bufferSize.addAndGet(-initCap);
+ }
+ }
+ }
+
+ return released;
+ }
+
+
+ public static int getBufferCount() {
+ return bufferMap.size();
+ }
+
+ public static void releaseBuffers() {
+ synchronized(bufferMap) {
+ final Set<UnsafeDirectLittleEndian> bufferSet = bufferMap.keySet();
+ final LinkedList<UnsafeDirectLittleEndian> bufferList = new LinkedList<>(bufferSet);
+ while(!bufferList.isEmpty()) {
+ final UnsafeDirectLittleEndian udle = bufferList.removeFirst();
+ udle.release(udle.refCnt());
+ }
+ }
+ }
+
+ public static void logBuffers(final Logger logger) {
+ synchronized (bufferMap) {
+ int count = 0;
+ final Set<UnsafeDirectLittleEndian> bufferSet = bufferMap.keySet();
+ for (final UnsafeDirectLittleEndian udle : bufferSet) {
+ final StackTrace stackTrace = bufferMap.get(udle);
+ ++count;
+ logger.debug("#" + count + " active buffer allocated at\n" + stackTrace);
+ }
+ }
+ }
UnsafeDirectLittleEndian(LargeBuffer buf) {
this(buf, true);
}
@@ -50,7 +108,7 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
throw new IllegalStateException("Drill only runs on LittleEndian systems.");
}
wrapped = buf;
- this.memoryAddress = buf.memoryAddress();
+ memoryAddress = buf.memoryAddress();
}
private long addr(int index) {
return memoryAddress + index;
http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
index a708e92..7d14b94 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
@@ -20,121 +20,167 @@ package org.apache.drill.exec.memory;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.DrillBuf;
-import java.io.Closeable;
-
-import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.util.Pointer;
/**
- * Wrapper class to deal with byte buffer allocation. Ensures users only use designated methods. Also allows inser
+ * Wrapper class to deal with byte buffer allocation. Ensures users only use designated methods.
*/
-public interface BufferAllocator extends Closeable {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BufferAllocator.class);
-
+public interface BufferAllocator extends AutoCloseable {
/**
* Allocate a new or reused buffer of the provided size. Note that the buffer may technically be larger than the
- * requested size for rounding purposes. However, the buffers capacity will be set to the configured size.
+ * requested size for rounding purposes. However, the buffer's capacity will be set to the configured size.
*
- * @param size
- * The size in bytes.
- * @return A new ByteBuf.
- * @throws OutOfMemoryException if buffer cannot be allocated
+ * @param size The size in bytes.
+ * @return a new DrillBuf, or null if the request can't be satisfied
+ * @throws OutOfMemoryRuntimeException if buffer cannot be allocated
*/
- public abstract DrillBuf buffer(int size);
+ public DrillBuf buffer(int size);
/**
* Allocate a new or reused buffer within provided range. Note that the buffer may technically be larger than the
- * requested size for rounding purposes. However, the buffers capacity will be set to the configured size.
+ * requested size for rounding purposes. However, the buffer's capacity will be set to the configured size.
*
* @param minSize The minimum size in bytes.
* @param maxSize The maximum size in bytes.
- * @return A new ByteBuf.
- * @throws OutOfMemoryException if buffer cannot be allocated
+ * @return a new DrillBuf, or null if the request can't be satisfied
+ * @throws OutOfMemoryRuntimeException if buffer cannot be allocated
*/
- public abstract DrillBuf buffer(int minSize, int maxSize);
+ public DrillBuf buffer(int minSize, int maxSize);
- public abstract ByteBufAllocator getUnderlyingAllocator();
+ /**
+ * Returns the allocator this allocator falls back to when it needs more memory.
+ *
+ * @return the underlying allocator used by this allocator
+ */
+ public ByteBufAllocator getUnderlyingAllocator();
/**
* Create a child allocator nested below this one.
*
- * @param context
- * - BufferManager associated with the new child allocator
- * @param initialReservation
- * - specified in bytes
- * @param maximumReservation
- * - specified in bytes
- * @param applyFragmentLimit
- * - flag to conditionally enable fragment memory limits
+ * @param context - the owner or this allocator
+ * @param initialReservation - specified in bytes
+ * @param maximumReservation - specified in bytes
+ * @param applyFragmentLimit - flag to conditionally enable fragment memory limits
* @return - a new buffer allocator owned by the parent it was spawned from
- * @throws OutOfMemoryException
- * - when off-heap memory has been exhausted
*/
- public abstract BufferAllocator getChildAllocator(LimitConsumer limitListener, long initialReservation,
- long maximumReservation, boolean applyFragmentLimit) throws OutOfMemoryException;
+ @Deprecated
+ public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation,
+ long maximumReservation, boolean applyFragmentLimit);
/**
- * Take over ownership of fragment accounting. Always takes over ownership.
- * @param buf
- * @return false if over allocation.
+ * Flag: this allocator is a limiting sub-tree root, meaning that the maxAllocation for
+ * it applies to all its descendant child allocators. In low memory situations, the limits
+ * for sub-tree roots may be adjusted down so that they evenly share the total amount of
+ * direct memory across all the sub-tree roots.
*/
- public boolean takeOwnership(DrillBuf buf) ;
+ public final static int F_LIMITING_ROOT = 0x0001;
/**
- * Take over ownership of fragment accounting. Always takes over ownership.
- * @param buf
- * @return false if over allocation.
+ * Create a new child allocator.
+ *
+ * @param allocatorOwner the allocator owner
+ * @param initReservation the initial space reservation (obtained from this allocator)
+ * @param maxAllocation maximum amount of space the new allocator can allocate
+ * @param flags one or more of BufferAllocator.F_* flags
+ * @return the new allocator, or null if it can't be created
*/
- public boolean takeOwnership(DrillBuf buf, Pointer<DrillBuf> bufOut);
-
- public PreAllocator getNewPreAllocator();
-
- //public void addFragmentContext(FragmentContext c);
+ public BufferAllocator newChildAllocator(AllocatorOwner allocatorOwner,
+ long initReservation, long maxAllocation, int flags);
/**
- * For Top Level Allocators. Reset the fragment limits for all allocators
+ * Take over ownership of the given buffer, adjusting accounting accordingly.
+ * This allocator always takes over ownership.
+ *
+ * @param buf the buffer to take over
+ * @return false if over allocation
*/
- public void resetLimits();
+ public boolean takeOwnership(DrillBuf buf);
/**
- * For Child allocators to set the Fragment limit for the corresponding fragment allocator.
- * @param l the new fragment limit
+ * Share ownership of a buffer between allocators.
+ *
+ * @param buf the buffer
+ * @param bufOut a new DrillBuf owned by this allocator, but sharing the same underlying buffer
+ * @return false if over allocation.
*/
- public void setLimit(long l);
-
- public long getLimit();
-
+ public boolean shareOwnership(DrillBuf buf, Pointer<DrillBuf> bufOut);
/**
* Not thread safe.
*
* WARNING: unclaimed pre-allocations leak memory. If you call preAllocate(), you must
* make sure to ultimately try to get the buffer and release it.
+ *
+ * For Child allocators to set their Fragment limits.
+ *
+ * @param fragmentLimit the new fragment limit
*/
- public interface PreAllocator {
- public boolean preAllocate(int bytes);
-
- public DrillBuf getAllocation();
- }
+ @Deprecated // happens automatically, and via allocation policies
+ public void setFragmentLimit(long fragmentLimit);
/**
- * @param bytes
- * @return
+ * Returns the current fragment limit.
+ *
+ * @return the current fragment limit
+ */
+ /*
+ * TODO should be replaced with something more general because of
+ * the availability of multiple allocation policies
+ *
+ * TODO We should also have a getRemainingMemory() so operators
+ * can query how much more is left to allocate. That could be
+ * tricky.
*/
+ @Deprecated
+ public long getFragmentLimit();
/**
+ * Return a unique Id for an allocator. Id's may be recycled after
+ * a long period of time.
*
+ * <p>Primary use for this is for debugging output.</p>
+ *
+ * @return the allocator's id
*/
+ public int getId();
/**
* Close and release all buffers generated from this buffer pool.
+ *
+ * <p>When assertions are on, complains if there are any outstanding buffers; to avoid
+ * that, release all buffers before the allocator is closed.
*/
@Override
- public abstract void close();
+ public void close() throws Exception;
- public abstract long getAllocatedMemory();
+ /**
+ * Returns the amount of memory currently allocated from this allocator.
+ *
+ * @return the amount of memory currently allocated
+ */
+ public long getAllocatedMemory();
- public abstract long getPeakMemoryAllocation();
+ /**
+ * Returns the peak amount of memory allocated from this allocator.
+ *
+ * @return the peak amount of memory allocated
+ */
+ public long getPeakMemoryAllocation();
+ /**
+ * Returns an empty DrillBuf.
+ *
+ * @return an empty DrillBuf
+ */
public DrillBuf getEmpty();
+
+ /**
+ * Create an allocation reservation. A reservation is a way of building up
+ * a request for a buffer whose size is not known in advance. See
+ * {@see AllocationReservation}.
+ *
+ * @return the newly created reservation
+ */
+ public AllocationReservation newReservation();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicy.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicy.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicy.java
new file mode 100644
index 0000000..4f1a1bd
--- /dev/null
+++ b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicy.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+/**
+ * Implicitly specifies an allocation policy by providing a factory method to
+ * create an enforcement agent.
+ *
+ * <p>Allocation policies are meant to be global, and may not work properly if
+ * different allocators are given different policies. These are designed to
+ * be supplied to the root-most allocator only, and then shared with descendant
+ * (child) allocators.</p>
+ */
+public interface AllocationPolicy {
+ /**
+ * Create an allocation policy enforcement agent. Each newly created allocator should
+ * call this in order to obtain its own agent.
+ *
+ * @return the newly instantiated agent; if an agent's implementation is stateless,
+ * this may return a sharable singleton
+ */
+ AllocationPolicyAgent newAgent();
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicyAgent.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicyAgent.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicyAgent.java
new file mode 100644
index 0000000..ad51ee6
--- /dev/null
+++ b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicyAgent.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+/**
+ * Per-allocator enforcement agent for allocation policies; created by
+ * {@link AllocationPolicy#newAgent()}.
+ */
+public interface AllocationPolicyAgent extends AutoCloseable {
+ /**
+ * Checks to see if creating a new allocator using the given specifications
+ * is allowed; should throw an exception if not.
+ *
+ * @param parentAllocator the parent allocator
+ * @param initReservation initial reservation the allocator should have
+ * @param maxAllocation the maximum allocation the allocator will allow
+ * @param flags the allocation option flags
+ * @throws OutOfMemoryException if the new allocator shouldn't be created
+ */
+ void checkNewAllocator(BufferAllocator parentAllocator,
+ long initReservation, long maxAllocation, int flags);
+
+ /**
+ * Get the currently applicable memory limit for the provided allocator.
+ * The interpretation of this value varies with the allocation policy in
+ * use, and each policy should describe what to expect.
+ *
+ * @param bufferAllocator the allocator
+ * @return the memory limit
+ */
+ long getMemoryLimit(BufferAllocator bufferAllocator);
+
+ /**
+ * Initialize the agent for a newly created allocator. Should be called from
+ * the allocator's constructor to initialize the agent for the allocator.
+ *
+ * @param bufferAllocator the newly created allocator.
+ */
+ void initializeAllocator(BufferAllocator bufferAllocator);
+
+ /**
+ * Indicate if any available memory owned by this allocator should
+ * be released to its parent. Allocators may use this to limit the
+ * amount of unused memory they retain for future requests; agents may
+ * request that memory be returned if there is currently a high demand
+ * for memory that other allocators could use if this allocator
+ * doesn't need it.
+ *
+ * @param bufferAllocator
+ * @return true if available memory owned by this allocator should be given
+ * back to its parent
+ */
+ boolean shouldReleaseToParent(BufferAllocator bufferAllocator);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java
new file mode 100644
index 0000000..1803572
--- /dev/null
+++ b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Supports cumulative allocation reservation. Clients may increase the size of
+ * the reservation repeatedly until they call for an allocation of the current
+ * total size. The reservation can only be used once, and will throw an exception
+ * if it is used more than once.
+ *
+ * <p>For the purposes of airtight memory accounting, the reservation must be close()d
+ * whether it is used or not.
+ */
+public abstract class AllocationReservation implements AutoCloseable {
+ private int nBytes = 0;
+ private boolean used = false;
+ private boolean closed = false;
+
+ /**
+ * Constructor. Prevent construction except by derived classes.
+ *
+ * <p>The expectation is that the derived class will be a non-static inner
+ * class in an allocator.
+ */
+ protected AllocationReservation() {
+ }
+
+ /**
+ * Add to the current reservation.
+ *
+ * <p>Adding may fail if the allocator is not allowed to consume any more space.
+ *
+ * @param nBytes the number of bytes to add
+ * @return true if the addition is possible, false otherwise
+ * @throws IllegalStateException if called after buffer() is used to allocate the reservation
+ */
+ public boolean add(final int nBytes) {
+ Preconditions.checkArgument(nBytes >= 0, "nBytes(%d) < 0", nBytes);
+ Preconditions.checkState(!closed, "Attempt to increase reservation after reservation has been closed");
+ Preconditions.checkState(!used, "Attempt to increase reservation after reservation has been used");
+
+ if (!reserve(nBytes)) {
+ return false;
+ }
+
+ this.nBytes += nBytes;
+ return true;
+ }
+
+ /**
+ * Requests a reservation of additional space.
+ *
+ * <p>The implementation of the allocator's inner class provides this.
+ *
+ * @param nBytes the amount to reserve
+ * @return true if the reservation can be satisfied, false otherwise
+ */
+ protected abstract boolean reserve(int nBytes);
+
+ /**
+ * Allocate a buffer whose size is the total of all the add()s made.
+ *
+ * <p>The allocation request can still fail, even if the amount of space
+ * requested is available, if the allocation cannot be made contiguously.
+ *
+ * @return the buffer, or null, if the request cannot be satisfied
+ * @throws IllegalStateException if called called more than once
+ */
+ public DrillBuf buffer() {
+ Preconditions.checkState(!closed, "Attempt to allocate after closed");
+ Preconditions.checkState(!used, "Attempt to allocate more than once");
+
+ final DrillBuf drillBuf = allocate(nBytes);
+ used = true;
+ return drillBuf;
+ }
+
+ /**
+ * Allocate the a buffer of the requested size.
+ *
+ * <p>The implementation of the allocator's inner class provides this.
+ *
+ * @param nBytes the size of the buffer requested
+ * @return the buffer, or null, if the request cannot be satisfied
+ */
+ protected abstract DrillBuf allocate(int nBytes);
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
+ }
+ if (!used) {
+ releaseReservation(nBytes);
+ }
+
+ closed = true;
+ }
+
+ /**
+ * Return the reservation back to the allocator without having used it.
+ *
+ * @param nBytes the size of the reservation
+ */
+ protected abstract void releaseReservation(int nBytes);
+
+ /**
+ * Get the current size of the reservation (the sum of all the add()s).
+ *
+ * @return size of the current reservation
+ */
+ public int getSize() {
+ return nBytes;
+ }
+
+ /**
+ * Return whether or not the reservation has been used.
+ *
+ * @return whether or not the reservation has been used
+ */
+ public boolean isUsed() {
+ return used;
+ }
+
+ /**
+ * Return whether or not the reservation has been closed.
+ *
+ * @return whether or not the reservation has been closed
+ */
+ public boolean isClosed() {
+ return closed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java
new file mode 100644
index 0000000..8bf2a99
--- /dev/null
+++ b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+/**
+ * Exception thrown when a closed BufferAllocator is used. Note
+ * this is an unchecked exception.
+ *
+ * @param message string associated with the cause
+ */
+@SuppressWarnings("serial")
+public class AllocatorClosedException extends RuntimeException {
+ public AllocatorClosedException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorOwner.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorOwner.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorOwner.java
new file mode 100644
index 0000000..f2d3df9
--- /dev/null
+++ b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorOwner.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.testing.ExecutionControls;
+
+/**
+ * This interface provides a means for allocator owners to inject services
+ * required by allocators, as well as to identify themselves for debugging purposes.
+ * Identification is done by overriding the implementation of
+ * {#link {@link Object#toString()}.
+ */
+public interface AllocatorOwner {
+ /**
+ * Get the current ExecutionControls from the allocator's owner.
+ *
+ * @return the current execution controls; may return null if this isn't
+ * possible
+ */
+ ExecutionControls getExecutionControls();
+
+ @Deprecated // Only for TopLevelAllocator and its friends.
+ FragmentContext getFragmentContext();
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorsStatsMXBean.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorsStatsMXBean.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorsStatsMXBean.java
new file mode 100644
index 0000000..00d8c4f
--- /dev/null
+++ b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorsStatsMXBean.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+/**
+ * JMX bean interface for global allocator statistics.
+ */
+// TODO use Stats infrastructure instead of JMX beans
+public interface AllocatorsStatsMXBean {
+ /**
+ * Get the maximum amount of direct memory that can be used.
+ *
+ * <p>This is determined by what is available, or by the drillbit
+ * configuration, if it specifies a value.</p>
+ *
+ * @return the amount of direct memory that can be used
+ */
+ public long getMaxDirectMemory();
+}