You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/12/22 16:06:27 UTC

[04/13] drill git commit: DRILL-4134: Add new allocator

http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
index b85502b..d244b26 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
@@ -27,28 +27,40 @@ import java.nio.ByteOrder;
 import java.nio.channels.GatheringByteChannel;
 import java.nio.channels.ScatteringByteChannel;
 import java.nio.charset.Charset;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.drill.common.HistoricalLog;
 import org.apache.drill.exec.memory.Accountor;
-import org.apache.drill.exec.memory.BoundsChecking;
+import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.BufferManager;
-
+import org.apache.drill.exec.memory.BufferLedger;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.util.AssertionUtil;
+import org.apache.drill.exec.util.Pointer;
+import org.slf4j.Logger;
+
+import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 
 public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillBuf.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillBuf.class);
 
-  private static final boolean BOUNDS_CHECKING_ENABLED = BoundsChecking.BOUNDS_CHECKING_ENABLED;
+  private static final boolean BOUNDS_CHECKING_ENABLED = AssertionUtil.BOUNDS_CHECKING_ENABLED;
+  private static final boolean DEBUG = BaseAllocator.isDebug();
+  private static final AtomicInteger idGenerator = new AtomicInteger(0);
 
-  private final ByteBuf b;
+  private final ByteBuf byteBuf;
   private final long addr;
   private final int offset;
-  private final boolean rootBuffer;
-  private final AtomicLong rootRefCnt = new AtomicLong(1);
+  private final int flags;
+  private final AtomicInteger rootRefCnt;
   private volatile BufferAllocator allocator;
-  private volatile Accountor acct;
-  private volatile int length;
 
   // TODO - cleanup
   // The code is partly shared and partly copy-pasted between
@@ -56,28 +68,153 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   // to share code and to remove the hacky code here to use only
   // one of these types at a time and use null checks to find out
   // which.
+  private final boolean oldWorld; // Indicates that we're operating with TopLevelAllocator.
+  private final boolean rootBuffer;
+  private volatile Accountor acct;
   private BufferManager bufManager;
+  @Deprecated private OperatorContext operatorContext;
+  @Deprecated private FragmentContext fragmentContext;
+
+  private volatile BufferLedger bufferLedger;
+  private volatile int length; // TODO this just seems to duplicate .capacity()
+
+  // members used purely for debugging
+  // TODO once we have a reduced number of constructors, move these to DEBUG clauses in them
+  private final int id = idGenerator.incrementAndGet();
+  private final HistoricalLog historicalLog = DEBUG ? new HistoricalLog(4, "DrillBuf[%d]", id) : null;
+  private final static IdentityHashMap<UnsafeDirectLittleEndian, Collection<DrillBuf>> unwrappedMap =
+      DEBUG ? new IdentityHashMap<UnsafeDirectLittleEndian, Collection<DrillBuf>>() : null;
+
+  // TODO(cwestin) javadoc
+  private void unwrappedPut() {
+    final UnsafeDirectLittleEndian udle = (UnsafeDirectLittleEndian) byteBuf;
+    synchronized(unwrappedMap) {
+      Collection<DrillBuf> drillBufs = unwrappedMap.get(udle);
+      if (drillBufs == null) {
+        drillBufs = new LinkedList<DrillBuf>();
+        unwrappedMap.put(udle, drillBufs);
+      }
+
+      drillBufs.add(this);
+    }
+  }
+
+  // TODO(cwestin) javadoc
+  public static Collection<DrillBuf> unwrappedGet(final UnsafeDirectLittleEndian udle) {
+    synchronized(unwrappedMap) {
+      final Collection<DrillBuf> drillBufs = unwrappedMap.get(udle);
+      if (drillBufs == null) {
+        return Collections.emptyList();
+      }
+      return new LinkedList<DrillBuf>(drillBufs);
+    }
+  }
+
+  // TODO(cwestin) javadoc
+  private static boolean unwrappedRemove(final DrillBuf drillBuf) {
+    final ByteBuf byteBuf = drillBuf.unwrap();
+    if (!(byteBuf instanceof UnsafeDirectLittleEndian)) {
+      return false;
+    }
+
+    final UnsafeDirectLittleEndian udle = (UnsafeDirectLittleEndian) byteBuf;
+    synchronized(unwrappedMap) {
+      Collection<DrillBuf> drillBufs = unwrappedMap.get(udle);
+      if (drillBufs == null) {
+        return false;
+      }
+      final Object object = drillBufs.remove(drillBuf);
+      if (drillBufs.isEmpty()) {
+        unwrappedMap.remove(udle);
+      }
+      return object != null;
+    }
+  }
 
   public DrillBuf(BufferAllocator allocator, Accountor a, UnsafeDirectLittleEndian b) {
     super(b.maxCapacity());
-    this.b = b;
+    this.byteBuf = b;
     this.addr = b.memoryAddress();
     this.acct = a;
     this.length = b.capacity();
     this.offset = 0;
     this.rootBuffer = true;
     this.allocator = allocator;
+
+    // members from the new world order
+    flags = 0;
+    rootRefCnt = null;
+    oldWorld = true;
+  }
+
+  // TODO(cwestin) javadoc
+  public DrillBuf(final BufferLedger bufferLedger, final BufferAllocator bufferAllocator,
+      final UnsafeDirectLittleEndian byteBuf) {
+    super(byteBuf.maxCapacity());
+    this.byteBuf = byteBuf;
+    byteBuf.retain(1);
+    this.bufferLedger = bufferLedger;
+    addr = byteBuf.memoryAddress();
+    allocator = bufferAllocator;
+    length = byteBuf.capacity();
+    offset = 0;
+    flags = 0;
+    rootRefCnt = new AtomicInteger(1);
+    oldWorld = false;
+
+    // members from the old world order
+    rootBuffer = false;
+    acct = null;
+
+    if (DEBUG) {
+      unwrappedPut();
+      historicalLog.recordEvent(
+          "DrillBuf(BufferLedger, BufferAllocator[%d], UnsafeDirectLittleEndian[identityHashCode == "
+              + "%d](%s)) => rootRefCnt identityHashCode == %d",
+              bufferAllocator.getId(), System.identityHashCode(byteBuf), byteBuf.toString(),
+              System.identityHashCode(rootRefCnt));
+    }
   }
 
   private DrillBuf(BufferAllocator allocator, Accountor a) {
     super(0);
-    this.b = new EmptyByteBuf(allocator.getUnderlyingAllocator()).order(ByteOrder.LITTLE_ENDIAN);
+    this.byteBuf = new EmptyByteBuf(allocator.getUnderlyingAllocator()).order(ByteOrder.LITTLE_ENDIAN);
     this.allocator = allocator;
     this.acct = a;
     this.length = 0;
     this.addr = 0;
     this.rootBuffer = false;
     this.offset = 0;
+
+    // members from the new world order
+    flags = 0;
+    rootRefCnt = null;
+    oldWorld = true;
+  }
+
+  private DrillBuf(final BufferLedger bufferLedger, final BufferAllocator bufferAllocator) {
+    super(0);
+    this.bufferLedger = bufferLedger;
+    allocator = bufferAllocator;
+
+    byteBuf = new EmptyByteBuf(bufferLedger.getUnderlyingAllocator()).order(ByteOrder.LITTLE_ENDIAN);
+    length = 0;
+    addr = 0;
+    flags = 0;
+    rootRefCnt = new AtomicInteger(1);
+    offset = 0;
+
+    // members from the old world order
+    rootBuffer = false;
+    acct = null;
+    oldWorld = false;
+
+    if (DEBUG) {
+      // We don't put the empty buffers in the unwrappedMap.
+      historicalLog.recordEvent(
+          "DrillBuf(BufferLedger, BufferAllocator[%d])  => rootRefCnt identityHashCode == %d",
+          bufferAllocator.getId(), System.identityHashCode(rootRefCnt));
+    }
   }
 
   /**
@@ -96,7 +233,6 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
     b.unwrap().unwrap().retain();
   }
 
-
   private DrillBuf(DrillBuf buffer, int index, int length) {
     this(buffer.allocator, null, buffer, buffer, index, length, false);
   }
@@ -105,6 +241,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
     ByteBuf underlying = b.unwrap().unwrap();
     return underlying.slice((int) (b.memoryAddress() - underlying.memoryAddress()), b.length);
   }
+
   private DrillBuf(BufferAllocator allocator, Accountor a, ByteBuf replacement, DrillBuf buffer, int index, int length, boolean root) {
     super(length);
     if (index < 0 || index > buffer.capacity() - length) {
@@ -114,16 +251,138 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
     this.length = length;
     writerIndex(length);
 
-    this.b = replacement;
+    this.byteBuf = replacement;
     this.addr = buffer.memoryAddress() + index;
     this.offset = index;
     this.acct = a;
     this.length = length;
     this.rootBuffer = root;
     this.allocator = allocator;
+
+    // members from the new world order
+    flags = 0;
+    rootRefCnt = null;
+    oldWorld = true;
   }
 
+  /**
+   * Indicate a shared refcount, as per http://netty.io/wiki/reference-counted-objects.html#wiki-h3-5
+   */
+  private final static int F_DERIVED = 0x0002;
+
+  // TODO(cwestin) javadoc
+  /**
+   * Used for sharing.
+   *
+   * @param bufferLedger
+   * @param bufferAllocator
+   * @param originalBuf
+   * @param index
+   * @param length
+   * @param flags
+   */
+  public DrillBuf(final BufferLedger bufferLedger, final BufferAllocator bufferAllocator,
+      final DrillBuf originalBuf, final int index, final int length, final int flags) {
+    this(bufferAllocator, bufferLedger, getUnderlyingUdle(originalBuf),
+        originalBuf, index + originalBuf.offset, length, flags);
+  }
+
+  /**
+   * Unwraps a DrillBuf until the underlying UnsafeDirectLittleEndian buffer is
+   * found.
+   *
+   * @param originalBuf the original DrillBuf
+   * @return the underlying UnsafeDirectLittleEndian ByteBuf
+   */
+  private static ByteBuf getUnderlyingUdle(final DrillBuf originalBuf) {
+    int count = 1;
+    ByteBuf unwrapped = originalBuf.unwrap();
+    while(!(unwrapped instanceof UnsafeDirectLittleEndian)
+        && (!(unwrapped instanceof EmptyByteBuf))) {
+      unwrapped = unwrapped.unwrap();
+      ++count;
+    }
+
+    if (DEBUG) {
+      if (count > 1) {
+        throw new IllegalStateException("UnsafeDirectLittleEndian is wrapped more than one level");
+      }
+    }
+
+    return unwrapped;
+  }
+
+  // TODO(cwestin) javadoc
+  /*
+   * TODO the replacement argument becomes an UnsafeDirectLittleEndian;
+   * buffer argument may go away if it is determined to be unnecessary after all
+   * the deprecated stuff is removed (I suspect only the replacement argument is
+   * necessary then).
+   */
+  private DrillBuf(BufferAllocator allocator, BufferLedger bufferLedger,
+      ByteBuf replacement, DrillBuf buffer, int index, int length, int flags) {
+    super(replacement.maxCapacity());
+
+    // members from the old world order
+    rootBuffer = false;
+    acct = null;
+    oldWorld = false;
+
+    if (index < 0 || index > (replacement.maxCapacity() - length)) {
+      throw new IndexOutOfBoundsException(replacement.toString() + ".slice(" + index + ", " + length + ')');
+    }
+
+    this.flags = flags;
+
+    this.length = length; // capacity()
+    writerIndex(length);
+
+    byteBuf = replacement;
+    if ((flags & F_DERIVED) == 0) {
+      replacement.retain(1);
+    }
+
+    addr = replacement.memoryAddress() + index;
+    offset = index;
+    this.bufferLedger = bufferLedger;
+    if (!(buffer instanceof DrillBuf)) {
+      throw new IllegalArgumentException("DrillBuf slicing can only be performed on other DrillBufs");
+    }
+
+    if ((flags & F_DERIVED) != 0) {
+      final DrillBuf rootBuf = (DrillBuf) buffer;
+      rootRefCnt = rootBuf.rootRefCnt;
+    } else {
+      rootRefCnt = new AtomicInteger(1);
+    }
+
+    this.allocator = allocator;
+
+    if (DEBUG) {
+      unwrappedPut();
+      historicalLog.recordEvent(
+          "DrillBuf(BufferAllocator[%d], BufferLedger, ByteBuf[identityHashCode == "
+              + "%d](%s), DrillBuf[%d], index = %d, length = %d, flags = 0x%08x)"
+              + " => rootRefCnt identityHashCode == %d",
+          allocator.getId(), System.identityHashCode(replacement), replacement.toString(),
+          buffer.id, index, length, flags, System.identityHashCode(rootRefCnt));
+    }
+  }
+
+  @Deprecated
+  public void setOperatorContext(OperatorContext c) {
+    this.operatorContext = c;
+  }
+
+  @Deprecated
+  public void setFragmentContext(FragmentContext c) {
+    this.fragmentContext = c;
+  }
+
+  // TODO(DRILL-3331)
   public void setBufferManager(BufferManager bufManager) {
+    Preconditions.checkState(this.bufManager == null,
+        "the BufferManager for a buffer can only be set once");
     this.bufManager = bufManager;
   }
 
@@ -131,47 +390,50 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
     return allocator;
   }
 
-  public DrillBuf reallocIfNeeded(int size) {
+  public DrillBuf reallocIfNeeded(final int size) {
+    Preconditions.checkArgument(size >= 0, "reallocation size must be non-negative");
+
     if (this.capacity() >= size) {
       return this;
     }
 
-    if (bufManager != null) {
+    if (operatorContext != null) {
+      return operatorContext.replace(this, size);
+    } else if(fragmentContext != null) {
+      return fragmentContext.replace(this, size);
+    } else if (bufManager != null) {
       return bufManager.replace(this, size);
     } else {
       throw new UnsupportedOperationException("Realloc is only available in the context of an operator's UDFs");
     }
-
   }
 
   @Override
   public int refCnt() {
-    if(rootBuffer){
-      return (int) this.rootRefCnt.get();
-    }else{
-      return b.refCnt();
+    if (oldWorld) {
+      if(rootBuffer){
+        return (int) this.rootRefCnt.get();
+      }else{
+        return byteBuf.refCnt();
+      }
     }
 
+    return rootRefCnt.get();
   }
 
   private long addr(int index) {
     return addr + index;
   }
 
-  private final void checkIndexD(int index) {
-    ensureAccessible();
-    if (index < 0 || index >= capacity()) {
-      throw new IndexOutOfBoundsException(String.format(
-              "index: %d (expected: range(0, %d))", index, capacity()));
-    }
-  }
-
   private final void checkIndexD(int index, int fieldLength) {
     ensureAccessible();
     if (fieldLength < 0) {
       throw new IllegalArgumentException("length: " + fieldLength + " (expected: >= 0)");
     }
     if (index < 0 || index > capacity() - fieldLength) {
+      if (DEBUG) {
+        historicalLog.logHistory(logger);
+      }
       throw new IndexOutOfBoundsException(String.format(
               "index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity()));
     }
@@ -186,7 +448,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
    * @param start The starting position of the bytes to be read.
    * @param end The exclusive endpoint of the bytes to be read.
    */
-  public void checkBytes(int start, int end){
+  public void checkBytes(int start, int end) {
     if (BOUNDS_CHECKING_ENABLED) {
       checkIndexD(start, end - start);
     }
@@ -198,18 +460,51 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
     }
   }
 
-  private void chk(int index) {
-    if (BOUNDS_CHECKING_ENABLED) {
-      checkIndexD(index);
-    }
-  }
-
   private void ensure(int width) {
     if (BOUNDS_CHECKING_ENABLED) {
       ensureWritable(width);
     }
   }
 
+  /**
+   * Used by allocators to transfer ownership from one allocator to another.
+   *
+   * @param newLedger the new ledger the buffer should use going forward
+   * @param newAllocator the new allocator
+   * @return whether or not the buffer fits the receiving allocator's allocation limit
+   */
+  public boolean transferTo(final BufferAllocator newAllocator, final BufferLedger newLedger) {
+    final Pointer<BufferLedger> pNewLedger = new Pointer<>(newLedger);
+    final boolean fitsAllocation = bufferLedger.transferTo(newAllocator, pNewLedger, this);
+    allocator = newAllocator;
+    bufferLedger = pNewLedger.value;
+    return fitsAllocation;
+  }
+
+  /**
+   * DrillBuf's implementation of sharing buffer functionality, to be accessed from
+   * {@link BufferAllocator#shareOwnership(DrillBuf, Pointer)}. See that function
+   * for more information.
+   *
+   * @param otherLedger the ledger belonging to the other allocator to share with
+   * @param otherAllocator the other allocator to be shared with
+   * @param index the starting index (for slicing capability)
+   * @param length the length (for slicing capability)
+   * @return the new DrillBuf (wrapper)
+   */
+  public DrillBuf shareWith(final BufferLedger otherLedger, final BufferAllocator otherAllocator,
+      final int index, final int length) {
+    return shareWith(otherLedger, otherAllocator, index, length, 0);
+  }
+
+  // TODO(cwestin) javadoc
+  private DrillBuf shareWith(final BufferLedger otherLedger, final BufferAllocator otherAllocator,
+      final int index, final int length, final int flags) {
+    final Pointer<DrillBuf> pDrillBuf = new Pointer<>();
+    bufferLedger = bufferLedger.shareWith(pDrillBuf, otherLedger, otherAllocator, this, index, length, flags);
+    return pDrillBuf.value;
+  }
+
   public boolean transferAccounting(Accountor target) {
     if (rootBuffer) {
       boolean outcome = acct.transferTo(target, this, length);
@@ -221,7 +516,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   }
 
   @Override
-  public synchronized boolean release() {
+  public boolean release() {
     return release(1);
   }
 
@@ -230,20 +525,44 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
    */
   @Override
   public synchronized boolean release(int decrement) {
+    Preconditions.checkArgument(decrement > 0,
+        "release(%d) argument is not positive", decrement);
+    if (DEBUG) {
+      historicalLog.recordEvent("release(%d)", decrement);
+    }
 
-    if(rootBuffer){
-      final long newRefCnt = this.rootRefCnt.addAndGet(-decrement);
-      Preconditions.checkArgument(newRefCnt > -1, "Buffer has negative reference count.");
-      if (newRefCnt == 0) {
-        b.release(decrement);
-        acct.release(this, length);
-        return true;
+    if (oldWorld) {
+      if(rootBuffer){
+        final long newRefCnt = this.rootRefCnt.addAndGet(-decrement);
+        Preconditions.checkArgument(newRefCnt > -1, "Buffer has negative reference count.");
+        if (newRefCnt == 0) {
+          byteBuf.release(decrement);
+          acct.release(this, length);
+          return true;
+        }else{
+          return false;
+        }
       }else{
-        return false;
+        return byteBuf.release(decrement);
       }
-    }else{
-      return b.release(decrement);
     }
+
+    final int refCnt = rootRefCnt.addAndGet(-decrement);
+    Preconditions.checkState(refCnt >= 0, "DrillBuf[%d] refCnt has gone negative", id);
+    if (refCnt == 0) {
+      bufferLedger.release(this);
+
+      if (DEBUG) {
+        unwrappedRemove(this);
+      }
+
+      // release the underlying buffer
+      byteBuf.release(1);
+
+      return true;
+    }
+
+    return false;
   }
 
   @Override
@@ -253,31 +572,45 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   @Override
   public synchronized ByteBuf capacity(int newCapacity) {
-    if (rootBuffer) {
-      if (newCapacity == length) {
-        return this;
-      } else if (newCapacity < length) {
-        b.capacity(newCapacity);
-        int diff = length - b.capacity();
-        acct.releasePartial(this, diff);
-        this.length = length - diff;
-        return this;
+    if (oldWorld) {
+      if (rootBuffer) {
+        if (newCapacity == length) {
+          return this;
+        } else if (newCapacity < length) {
+          byteBuf.capacity(newCapacity);
+          int diff = length - byteBuf.capacity();
+          acct.releasePartial(this, diff);
+          this.length = length - diff;
+          return this;
+        } else {
+          throw new UnsupportedOperationException("Accounting byte buf doesn't support increasing allocations.");
+        }
       } else {
-        throw new UnsupportedOperationException("Accounting byte buf doesn't support increasing allocations.");
+        throw new UnsupportedOperationException("Non root bufs doen't support changing allocations.");
       }
-    } else {
-      throw new UnsupportedOperationException("Non root bufs doen't support changing allocations.");
     }
-  }
 
-  @Override
-  public int maxCapacity() {
-    return length;
+    if ((flags & F_DERIVED) != 0) {
+      throw new UnsupportedOperationException("Derived buffers don't support resizing.");
+    }
+
+    if (newCapacity == length) {
+      return this;
+    }
+
+    if (newCapacity < length) {
+      byteBuf.capacity(newCapacity);
+      final int diff = length - byteBuf.capacity();
+      length -= diff;
+      return this;
+    }
+
+    throw new UnsupportedOperationException("Buffers don't support resizing that increases the size.");
   }
 
   @Override
   public ByteBufAllocator alloc() {
-    return b.alloc();
+    return byteBuf.alloc();
   }
 
   @Override
@@ -287,14 +620,12 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   @Override
   public ByteBuf order(ByteOrder endianness) {
-    // if(endianness != ByteOrder.LITTLE_ENDIAN) throw new
-    // UnsupportedOperationException("Drill buffers only support little endian.");
     return this;
   }
 
   @Override
   public ByteBuf unwrap() {
-    return b;
+    return byteBuf;
   }
 
   @Override
@@ -309,7 +640,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   @Override
   public ByteBuf readSlice(int length) {
-    ByteBuf slice = slice(readerIndex(), length);
+    final ByteBuf slice = slice(readerIndex(), length);
     readerIndex(readerIndex() + length);
     return slice;
   }
@@ -329,16 +660,42 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
     return slice(readerIndex(), readableBytes());
   }
 
+  public static String bufferState(final ByteBuf buf) {
+    final int cap = buf.capacity();
+    final int mcap = buf.maxCapacity();
+    final int ri = buf.readerIndex();
+    final int rb = buf.readableBytes();
+    final int wi = buf.writerIndex();
+    final int wb = buf.writableBytes();
+    return String.format("cap/max: %d/%d, ri: %d, rb: %d, wi: %d, wb: %d",
+        cap, mcap, ri, rb, wi, wb);
+  }
+
   @Override
   public DrillBuf slice(int index, int length) {
-    DrillBuf buf = new DrillBuf(this, index, length);
-    buf.writerIndex = length;
+    if (oldWorld) {
+      DrillBuf buf = new DrillBuf(this, index, length);
+      buf.writerIndex = length;
+      return buf;
+    }
+
+    /*
+     * Re the behavior of reference counting,
+     * see http://netty.io/wiki/reference-counted-objects.html#wiki-h3-5, which explains
+     * that derived buffers share their reference count with their parent
+     */
+    final DrillBuf buf = shareWith(bufferLedger, allocator, index, length, F_DERIVED);
+    buf.writerIndex(length);
     return buf;
   }
 
   @Override
   public DrillBuf duplicate() {
-    return new DrillBuf(this, 0, length);
+    if (oldWorld) {
+      return new DrillBuf(this, 0, length);
+    }
+
+    return slice(0, length);
   }
 
   @Override
@@ -353,12 +710,12 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   @Override
   public ByteBuffer nioBuffer(int index, int length) {
-    return b.nioBuffer(offset + index, length);
+    return byteBuf.nioBuffer(offset + index, length);
   }
 
   @Override
   public ByteBuffer internalNioBuffer(int index, int length) {
-    return b.internalNioBuffer(offset + index, length);
+    return byteBuf.internalNioBuffer(offset + index, length);
   }
 
   @Override
@@ -373,17 +730,17 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   @Override
   public boolean hasArray() {
-    return b.hasArray();
+    return byteBuf.hasArray();
   }
 
   @Override
   public byte[] array() {
-    return b.array();
+    return byteBuf.array();
   }
 
   @Override
   public int arrayOffset() {
-    return b.arrayOffset();
+    return byteBuf.arrayOffset();
   }
 
   @Override
@@ -397,17 +754,26 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   }
 
   @Override
+  public String toString() {
+    return toString(0, 0, Charsets.UTF_8);
+  }
+
+  @Override
   public String toString(Charset charset) {
-      return toString(readerIndex, readableBytes(), charset);
+    return toString(readerIndex, readableBytes(), charset);
   }
 
   @Override
   public String toString(int index, int length, Charset charset) {
+    final String basics =
+        String.format("{DrillBuf[%d], udle identityHashCode == %d, rootRefCnt identityHashCode == %d}",
+            id, System.identityHashCode(byteBuf), System.identityHashCode(rootRefCnt));
+
     if (length == 0) {
-      return "";
+      return basics;
     }
 
-    ByteBuffer nioBuffer;
+    final ByteBuffer nioBuffer;
     if (nioBufferCount() == 1) {
       nioBuffer = nioBuffer(index, length);
     } else {
@@ -416,7 +782,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
       nioBuffer.flip();
     }
 
-    return ByteBufUtil.decodeString(nioBuffer, charset);
+    return basics + '\n' + ByteBufUtil.decodeString(nioBuffer, charset);
   }
 
   @Override
@@ -431,12 +797,22 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   }
 
   @Override
-  public synchronized ByteBuf retain(int increment) {
-    if(rootBuffer){
-      this.rootRefCnt.addAndGet(increment);
-    }else{
-      b.retain(increment);
+  public ByteBuf retain(int increment) {
+    Preconditions.checkArgument(increment > 0, "retain(%d) argument is not positive", increment);
+    if (DEBUG) {
+      historicalLog.recordEvent("retain(%d)", increment);
     }
+
+    if (oldWorld) {
+      if(rootBuffer){
+        this.rootRefCnt.addAndGet(increment);
+      }else{
+        byteBuf.retain(increment);
+      }
+      return this;
+    }
+
+    rootRefCnt.addAndGet(increment);
     return this;
   }
 
@@ -448,7 +824,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   @Override
   public long getLong(int index) {
     chk(index, 8);
-    long v = PlatformDependent.getLong(addr(index));
+    final long v = PlatformDependent.getLong(addr(index));
     return v;
   }
 
@@ -475,7 +851,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   @Override
   public int getInt(int index) {
     chk(index, 4);
-    int v = PlatformDependent.getInt(addr(index));
+    final int v = PlatformDependent.getInt(addr(index));
     return v;
   }
 
@@ -583,13 +959,13 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   @Override
   public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
-    b.getBytes(index + offset,  dst, dstIndex, length);
+    byteBuf.getBytes(index + offset, dst, dstIndex, length);
     return this;
   }
 
   @Override
   public ByteBuf getBytes(int index, ByteBuffer dst) {
-    b.getBytes(index + offset, dst);
+    byteBuf.getBytes(index + offset, dst);
     return this;
   }
 
@@ -657,19 +1033,19 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   @Override
   public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
-    b.getBytes(index + offset, dst, dstIndex, length);
+    byteBuf.getBytes(index + offset, dst, dstIndex, length);
     return this;
   }
 
   @Override
   public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
-    b.getBytes(index + offset, out, length);
+    byteBuf.getBytes(index + offset, out, length);
     return this;
   }
 
   @Override
   protected int _getUnsignedMedium(int index) {
-    long addr = addr(index);
+    final long addr = addr(index);
     return (PlatformDependent.getByte(addr) & 0xff) << 16 |
             (PlatformDependent.getByte(addr + 1) & 0xff) << 8 |
             PlatformDependent.getByte(addr + 2) & 0xff;
@@ -677,12 +1053,12 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   @Override
   public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
-    return b.getBytes(index + offset, out, length);
+    return byteBuf.getBytes(index + offset, out, length);
   }
 
   @Override
   public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
-    b.setBytes(index + offset, src, srcIndex, length);
+    byteBuf.setBytes(index + offset, src, srcIndex, length);
     return this;
   }
 
@@ -693,12 +1069,12 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
           length);
     } else {
       if (srcIndex == 0 && src.capacity() == length) {
-        b.setBytes(index + offset, src);
+        byteBuf.setBytes(index + offset, src);
       } else {
         ByteBuffer newBuf = src.duplicate();
         newBuf.position(srcIndex);
         newBuf.limit(srcIndex + length);
-        b.setBytes(index + offset, src);
+        byteBuf.setBytes(index + offset, src);
       }
     }
 
@@ -707,24 +1083,24 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   @Override
   public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
-    b.setBytes(index + offset, src, srcIndex, length);
+    byteBuf.setBytes(index + offset, src, srcIndex, length);
     return this;
   }
 
   @Override
   public ByteBuf setBytes(int index, ByteBuffer src) {
-    b.setBytes(index + offset, src);
+    byteBuf.setBytes(index + offset, src);
     return this;
   }
 
   @Override
   public int setBytes(int index, InputStream in, int length) throws IOException {
-    return b.setBytes(index + offset, in, length);
+    return byteBuf.setBytes(index + offset, in, length);
   }
 
   @Override
   public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
-    return b.setBytes(index + offset, in, length);
+    return byteBuf.setBytes(index + offset, in, length);
   }
 
   @Override
@@ -737,8 +1113,33 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
     return new DrillBuf(allocator, a);
   }
 
+  public static DrillBuf getEmpty(final BufferLedger bufferLedger, final BufferAllocator bufferAllocator) {
+    return new DrillBuf(bufferLedger, bufferAllocator);
+  }
+
+  /**
+   * Find out if this is a "root buffer." This is obsolete terminology
+   * based on the original implementation of DrillBuf, which would layer
+   * DrillBufs on top of other DrillBufs when slicing (or duplicating).
+   * The buffer at the bottom of the layer was the "root buffer." However,
+   * the current implementation flattens such references to always make
+   * DrillBufs that are wrap a single buffer underneath, and slices and
+   * their original source have a shared fate as per
+   * http://netty.io/wiki/reference-counted-objects.html#wiki-h3-5, so
+   * this concept isn't really meaningful anymore. But there are callers
+   * that want to know a buffer's original size, and whether or not it
+   * is "primal" in some sense. Perhaps this just needs a new name that
+   * indicates that the buffer was an "original" and not a slice.
+   *
+   * @return whether or not the buffer is an original
+   */
+  @Deprecated
   public boolean isRootBuffer() {
-    return rootBuffer;
+    if (oldWorld) {
+      return rootBuffer;
+    }
+
+    return (flags & F_DERIVED) == 0;
   }
 
   @Override
@@ -746,4 +1147,72 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
     release();
   }
 
+  /**
+   * Indicates whether this DrillBuf and the supplied one have a "shared fate."
+   * Having a "shared fate" indicates that the two DrillBufs share a reference
+   * count, and will both be released at the same time if either of them is
+   * released.
+   * @param otherBuf the other buffer to check against
+   * @return true if the two buffers have a shared fate, false otherwise
+   */
+  public boolean hasSharedFate(final DrillBuf otherBuf) {
+    return rootRefCnt == otherBuf.rootRefCnt;
+  }
+
+  private final static int LOG_BYTES_PER_ROW = 10;
+  /**
+   * Log this buffer's byte contents in the form of a hex dump.
+   *
+   * @param logger where to log to
+   * @param start the starting byte index
+   * @param length how many bytes to log
+   */
+  public void logBytes(final Logger logger, final int start, final int length) {
+    final int roundedStart = (start / LOG_BYTES_PER_ROW) * LOG_BYTES_PER_ROW;
+
+    final StringBuilder sb = new StringBuilder("buffer byte dump\n");
+    int index = roundedStart;
+    for(int nLogged = 0; nLogged < length; nLogged += LOG_BYTES_PER_ROW) {
+      sb.append(String.format(" [%05d-%05d]", index, index + LOG_BYTES_PER_ROW - 1));
+      for(int i = 0; i < LOG_BYTES_PER_ROW; ++i) {
+        try {
+          final byte b = getByte(index++);
+          sb.append(String.format(" 0x%02x", b));
+        } catch(IndexOutOfBoundsException ioob) {
+          sb.append(" <ioob>");
+        }
+      }
+      sb.append('\n');
+    }
+    logger.trace(sb.toString());
+  }
+
+  /**
+   * Get the integer id assigned to this DrillBuf for debugging purposes.
+   *
+   * @return integer id
+   */
+  public int getId() {
+    return id;
+  }
+
+  /**
+   * Log this buffer's history.
+   *
+   * @param logger the logger to use
+   */
+  public void logHistory(final Logger logger) {
+    if (historicalLog == null) {
+      logger.warn("DrillBuf[{}] historicalLog not available", id);
+    } else {
+      historicalLog.logHistory(logger);
+    }
+  }
+
+  public void logHistoryForUdle(final Logger logger, final UnsafeDirectLittleEndian udle) {
+    final Collection<DrillBuf> drillBufs = unwrappedGet(udle);
+    for(final DrillBuf drillBuf : drillBufs) {
+      drillBuf.logHistory(logger);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/base/src/main/java/io/netty/buffer/FakeAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/FakeAllocator.java b/exec/memory/base/src/main/java/io/netty/buffer/FakeAllocator.java
index 53ca91c..b8d0fb2 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/FakeAllocator.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/FakeAllocator.java
@@ -17,10 +17,11 @@
  */
 package io.netty.buffer;
 
-import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.memory.Accountor;
+import org.apache.drill.exec.memory.AllocationReservation;
+import org.apache.drill.exec.memory.AllocatorOwner;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.LimitConsumer;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.util.Pointer;
 
 class FakeAllocator implements BufferAllocator {
@@ -46,7 +47,7 @@ class FakeAllocator implements BufferAllocator {
   }
 
   @Override
-  public BufferAllocator getChildAllocator(LimitConsumer consumer, long initialReservation, long maximumReservation,
+  public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation, long maximumReservation,
                                            boolean applyFragmentLimit)
       throws OutOfMemoryException {
     throw new UnsupportedOperationException();
@@ -63,22 +64,12 @@ class FakeAllocator implements BufferAllocator {
   }
 
   @Override
-  public PreAllocator getNewPreAllocator() {
+  public void setFragmentLimit(long l) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public void resetLimits() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void setLimit(long l) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getLimit(){
+  public long getFragmentLimit(){
     throw new UnsupportedOperationException();
   }
 
@@ -96,9 +87,10 @@ class FakeAllocator implements BufferAllocator {
     return 0;
   }
 
-  static class FakeAccountor implements Accountor {
+  static class FakeAccountor extends Accountor {
 
     public FakeAccountor() {
+      super(null, false, null, null, 0, 0, true);
     }
 
     @Override
@@ -138,50 +130,38 @@ class FakeAllocator implements BufferAllocator {
 
     @Override
     public void releasePartial(DrillBuf buf, long size) {
-      throw new UnsupportedOperationException();
-    }
 
-    @Override
-    public void release(DrillBuf buf, long size) {
-      throw new UnsupportedOperationException();
     }
 
     @Override
-    public void close() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean transferIn(DrillBuf buf, long size) {
-      return false;
-    }
-
-    @Override
-    public long getPeakMemoryAllocation() {
-      return 0;
-    }
+    public void release(DrillBuf buf, long size) {
 
-    @Override
-    public long resetFragmentLimits() {
-      return 0;
     }
 
     @Override
-    public void setFragmentLimit(long add) {
-      throw new UnsupportedOperationException();
-    }
+    public void close() {
 
-    @Override
-    public long getFragmentLimit() {
-      return 0;
     }
+  }
 
+  @Override
+  public BufferAllocator newChildAllocator(AllocatorOwner allocatorOwner,
+      long initReservation, long maxAllocation, int flags) {
+    throw new UnsupportedOperationException();
+  }
 
+  @Override
+  public boolean shareOwnership(DrillBuf buf, Pointer<DrillBuf> bufOut) {
+    throw new UnsupportedOperationException();
   }
 
   @Override
-  public boolean takeOwnership(DrillBuf buf, Pointer<DrillBuf> bufOut) {
+  public int getId() {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public AllocationReservation newReservation() {
+    throw new UnsupportedOperationException();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
index 559f06d..419aef3 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
@@ -27,10 +27,68 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
   private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
   private final AbstractByteBuf wrapped;
   private final long memoryAddress;
+  private static final boolean TRACK_BUFFERS = false;
   private AtomicLong bufferCount;
   private AtomicLong bufferSize;
   private long initCap = -1;
 
+  private final static IdentityHashMap<UnsafeDirectLittleEndian, StackTrace> bufferMap = new IdentityHashMap<>();
+
+  @Override
+  public boolean release() {
+    return release(1);
+  }
+
+  @Override
+  public boolean release(int decrement) {
+    boolean released = super.release(decrement);
+    if (TRACK_BUFFERS) {
+      if (released) {
+        final Object object;
+        synchronized (bufferMap) {
+          object = bufferMap.remove(this);
+        }
+        if (object == null) {
+          throw new IllegalStateException("no such buffer");
+        }
+
+        if (initCap != -1) {
+          bufferCount.decrementAndGet();
+          bufferSize.addAndGet(-initCap);
+        }
+      }
+    }
+
+    return released;
+  }
+
+
+  public static int getBufferCount() {
+    return bufferMap.size();
+  }
+
+  public static void releaseBuffers() {
+    synchronized(bufferMap) {
+      final Set<UnsafeDirectLittleEndian> bufferSet = bufferMap.keySet();
+      final LinkedList<UnsafeDirectLittleEndian> bufferList = new LinkedList<>(bufferSet);
+      while(!bufferList.isEmpty()) {
+        final UnsafeDirectLittleEndian udle = bufferList.removeFirst();
+        udle.release(udle.refCnt());
+      }
+    }
+  }
+
+  public static void logBuffers(final Logger logger) {
+    synchronized (bufferMap) {
+      int count = 0;
+      final Set<UnsafeDirectLittleEndian> bufferSet = bufferMap.keySet();
+      for (final UnsafeDirectLittleEndian udle : bufferSet) {
+        final StackTrace stackTrace = bufferMap.get(udle);
+        ++count;
+        logger.debug("#" + count + " active buffer allocated at\n" + stackTrace);
+      }
+    }
+  }
   UnsafeDirectLittleEndian(LargeBuffer buf) {
     this(buf, true);
   }
@@ -50,7 +108,7 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
       throw new IllegalStateException("Drill only runs on LittleEndian systems.");
     }
     wrapped = buf;
-    this.memoryAddress = buf.memoryAddress();
+    memoryAddress = buf.memoryAddress();
   }
     private long addr(int index) {
         return memoryAddress + index;

http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
index a708e92..7d14b94 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
@@ -20,121 +20,167 @@ package org.apache.drill.exec.memory;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.DrillBuf;
 
-import java.io.Closeable;
-
-import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.util.Pointer;
 
 /**
- * Wrapper class to deal with byte buffer allocation. Ensures users only use designated methods. Also allows inser
+ * Wrapper class to deal with byte buffer allocation. Ensures users only use designated methods.
  */
-public interface BufferAllocator extends Closeable {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BufferAllocator.class);
-
+public interface BufferAllocator extends AutoCloseable {
   /**
    * Allocate a new or reused buffer of the provided size. Note that the buffer may technically be larger than the
-   * requested size for rounding purposes. However, the buffers capacity will be set to the configured size.
+   * requested size for rounding purposes. However, the buffer's capacity will be set to the configured size.
    *
-   * @param size
-   *          The size in bytes.
-   * @return A new ByteBuf.
-   * @throws OutOfMemoryException if buffer cannot be allocated
+   * @param size The size in bytes.
+   * @return a new DrillBuf, or null if the request can't be satisfied
+   * @throws OutOfMemoryRuntimeException if buffer cannot be allocated
    */
-  public abstract DrillBuf buffer(int size);
+  public DrillBuf buffer(int size);
 
   /**
    * Allocate a new or reused buffer within provided range. Note that the buffer may technically be larger than the
-   * requested size for rounding purposes. However, the buffers capacity will be set to the configured size.
+   * requested size for rounding purposes. However, the buffer's capacity will be set to the configured size.
    *
    * @param minSize The minimum size in bytes.
    * @param maxSize The maximum size in bytes.
-   * @return A new ByteBuf.
-   * @throws OutOfMemoryException if buffer cannot be allocated
+   * @return a new DrillBuf, or null if the request can't be satisfied
+   * @throws OutOfMemoryRuntimeException if buffer cannot be allocated
    */
-  public abstract DrillBuf buffer(int minSize, int maxSize);
+  public DrillBuf buffer(int minSize, int maxSize);
 
-  public abstract ByteBufAllocator getUnderlyingAllocator();
+  /**
+   * Returns the allocator this allocator falls back to when it needs more memory.
+   *
+   * @return the underlying allocator used by this allocator
+   */
+  public ByteBufAllocator getUnderlyingAllocator();
 
   /**
    * Create a child allocator nested below this one.
    *
-   * @param context
-   *          - BufferManager associated with the new child allocator
-   * @param initialReservation
-   *          - specified in bytes
-   * @param maximumReservation
-   *          - specified in bytes
-   * @param applyFragmentLimit
-   *          - flag to conditionally enable fragment memory limits
+   * @param context - the owner or this allocator
+   * @param initialReservation - specified in bytes
+   * @param maximumReservation - specified in bytes
+   * @param applyFragmentLimit - flag to conditionally enable fragment memory limits
    * @return - a new buffer allocator owned by the parent it was spawned from
-   * @throws OutOfMemoryException
-   *           - when off-heap memory has been exhausted
    */
-  public abstract BufferAllocator getChildAllocator(LimitConsumer limitListener, long initialReservation,
-      long maximumReservation, boolean applyFragmentLimit) throws OutOfMemoryException;
+  @Deprecated
+  public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation,
+      long maximumReservation, boolean applyFragmentLimit);
 
   /**
-   * Take over ownership of fragment accounting.  Always takes over ownership.
-   * @param buf
-   * @return false if over allocation.
+   * Flag: this allocator is a limiting sub-tree root, meaning that the maxAllocation for
+   * it applies to all its descendant child allocators. In low memory situations, the limits
+   * for sub-tree roots may be adjusted down so that they evenly share the total amount of
+   * direct memory across all the sub-tree roots.
    */
-  public boolean takeOwnership(DrillBuf buf) ;
+  public final static int F_LIMITING_ROOT = 0x0001;
 
   /**
-   * Take over ownership of fragment accounting.  Always takes over ownership.
-   * @param buf
-   * @return false if over allocation.
+   * Create a new child allocator.
+   *
+   * @param allocatorOwner the allocator owner
+   * @param initReservation the initial space reservation (obtained from this allocator)
+   * @param maxAllocation maximum amount of space the new allocator can allocate
+   * @param flags one or more of BufferAllocator.F_* flags
+   * @return the new allocator, or null if it can't be created
    */
-  public boolean takeOwnership(DrillBuf buf, Pointer<DrillBuf> bufOut);
-
-  public PreAllocator getNewPreAllocator();
-
-  //public void addFragmentContext(FragmentContext c);
+  public BufferAllocator newChildAllocator(AllocatorOwner allocatorOwner,
+      long initReservation, long maxAllocation, int flags);
 
   /**
-   * For Top Level Allocators. Reset the fragment limits for all allocators
+   * Take over ownership of the given buffer, adjusting accounting accordingly.
+   * This allocator always takes over ownership.
+   *
+   * @param buf the buffer to take over
+   * @return false if over allocation
    */
-  public void resetLimits();
+  public boolean takeOwnership(DrillBuf buf);
 
   /**
-   * For Child allocators to set the Fragment limit for the corresponding fragment allocator.
-   * @param l the new fragment limit
+   * Share ownership of a buffer between allocators.
+   *
+   * @param buf the buffer
+   * @param bufOut a new DrillBuf owned by this allocator, but sharing the same underlying buffer
+   * @return false if over allocation.
    */
-  public void setLimit(long l);
-
-  public long getLimit();
-
+  public boolean shareOwnership(DrillBuf buf, Pointer<DrillBuf> bufOut);
 
   /**
    * Not thread safe.
    *
    * WARNING: unclaimed pre-allocations leak memory. If you call preAllocate(), you must
    * make sure to ultimately try to get the buffer and release it.
+   *
+   * For Child allocators to set their Fragment limits.
+   *
+   * @param fragmentLimit the new fragment limit
    */
-  public interface PreAllocator {
-    public boolean preAllocate(int bytes);
-
-    public DrillBuf getAllocation();
-  }
+  @Deprecated // happens automatically, and via allocation policies
+  public void setFragmentLimit(long fragmentLimit);
 
   /**
-   * @param bytes
-   * @return
+   * Returns the current fragment limit.
+   *
+   * @return the current fragment limit
+   */
+  /*
+   * TODO should be replaced with something more general because of
+   * the availability of multiple allocation policies
+   *
+   * TODO We should also have a getRemainingMemory() so operators
+   * can query how much more is left to allocate. That could be
+   * tricky.
    */
+  @Deprecated
+  public long getFragmentLimit();
 
   /**
+   * Return a unique Id for an allocator. Id's may be recycled after
+   * a long period of time.
    *
+   * <p>Primary use for this is for debugging output.</p>
+   *
+   * @return the allocator's id
    */
+  public int getId();
 
   /**
    * Close and release all buffers generated from this buffer pool.
+   *
+   * <p>When assertions are on, complains if there are any outstanding buffers; to avoid
+   * that, release all buffers before the allocator is closed.
    */
   @Override
-  public abstract void close();
+  public void close() throws Exception;
 
-  public abstract long getAllocatedMemory();
+  /**
+   * Returns the amount of memory currently allocated from this allocator.
+   *
+   * @return the amount of memory currently allocated
+   */
+  public long getAllocatedMemory();
 
-  public abstract long getPeakMemoryAllocation();
+  /**
+   * Returns the peak amount of memory allocated from this allocator.
+   *
+   * @return the peak amount of memory allocated
+   */
+  public long getPeakMemoryAllocation();
 
+  /**
+   * Returns an empty DrillBuf.
+   *
+   * @return an empty DrillBuf
+   */
   public DrillBuf getEmpty();
+
+  /**
+   * Create an allocation reservation. A reservation is a way of building up
+   * a request for a buffer whose size is not known in advance. See
+   * {@see AllocationReservation}.
+   *
+   * @return the newly created reservation
+   */
+  public AllocationReservation newReservation();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicy.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicy.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicy.java
new file mode 100644
index 0000000..4f1a1bd
--- /dev/null
+++ b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicy.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+/**
+ * Implicitly specifies an allocation policy by providing a factory method to
+ * create an enforcement agent.
+ *
+ * <p>Allocation policies are meant to be global, and may not work properly if
+ * different allocators are given different policies. These are designed to
+ * be supplied to the root-most allocator only, and then shared with descendant
+ * (child) allocators.</p>
+ */
+public interface AllocationPolicy {
+  /**
+   * Create an allocation policy enforcement agent. Each newly created allocator should
+   * call this in order to obtain its own agent.
+   *
+   * @return the newly instantiated agent; if an agent's implementation is stateless,
+   *   this may return a sharable singleton
+   */
+  AllocationPolicyAgent newAgent();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicyAgent.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicyAgent.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicyAgent.java
new file mode 100644
index 0000000..ad51ee6
--- /dev/null
+++ b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicyAgent.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+/**
+ * Per-allocator enforcement agent for allocation policies; created by
+ * {@link AllocationPolicy#newAgent()}.
+ */
+public interface AllocationPolicyAgent extends AutoCloseable {
+  /**
+   * Checks to see if creating a new allocator using the given specifications
+   * is allowed; should throw an exception if not.
+   *
+   * @param parentAllocator the parent allocator
+   * @param initReservation initial reservation the allocator should have
+   * @param maxAllocation the maximum allocation the allocator will allow
+   * @param flags the allocation option flags
+   * @throws OutOfMemoryException if the new allocator shouldn't be created
+   */
+  void checkNewAllocator(BufferAllocator parentAllocator,
+      long initReservation, long maxAllocation, int flags);
+
+  /**
+   * Get the currently applicable memory limit for the provided allocator.
+   * The interpretation of this value varies with the allocation policy in
+   * use, and each policy should describe what to expect.
+   *
+   * @param bufferAllocator the allocator
+   * @return the memory limit
+   */
+  long getMemoryLimit(BufferAllocator bufferAllocator);
+
+  /**
+   * Initialize the agent for a newly created allocator. Should be called from
+   * the allocator's constructor to initialize the agent for the allocator.
+   *
+   * @param bufferAllocator the newly created allocator.
+   */
+  void initializeAllocator(BufferAllocator bufferAllocator);
+
+  /**
+   * Indicate if any available memory owned by this allocator should
+   * be released to its parent. Allocators may use this to limit the
+   * amount of unused memory they retain for future requests; agents may
+   * request that memory be returned if there is currently a high demand
+   * for memory that other allocators could use if this allocator
+   * doesn't need it.
+   *
+   * @param bufferAllocator
+   * @return true if available memory owned by this allocator should be given
+   *   back to its parent
+   */
+  boolean shouldReleaseToParent(BufferAllocator bufferAllocator);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java
new file mode 100644
index 0000000..1803572
--- /dev/null
+++ b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Supports cumulative allocation reservation. Clients may increase the size of
+ * the reservation repeatedly until they call for an allocation of the current
+ * total size. The reservation can only be used once, and will throw an exception
+ * if it is used more than once.
+ *
+ * <p>For the purposes of airtight memory accounting, the reservation must be close()d
+ * whether it is used or not.
+ */
+public abstract class AllocationReservation implements AutoCloseable {
+  private int nBytes = 0;
+  private boolean used = false;
+  private boolean closed = false;
+
+  /**
+   * Constructor. Prevent construction except by derived classes.
+   *
+   * <p>The expectation is that the derived class will be a non-static inner
+   * class in an allocator.
+   */
+  protected AllocationReservation() {
+  }
+
+  /**
+   * Add to the current reservation.
+   *
+   * <p>Adding may fail if the allocator is not allowed to consume any more space.
+   *
+   * @param nBytes the number of bytes to add
+   * @return true if the addition is possible, false otherwise
+   * @throws IllegalStateException if called after buffer() is used to allocate the reservation
+   */
+  public boolean add(final int nBytes) {
+    Preconditions.checkArgument(nBytes >= 0, "nBytes(%d) < 0", nBytes);
+    Preconditions.checkState(!closed, "Attempt to increase reservation after reservation has been closed");
+    Preconditions.checkState(!used, "Attempt to increase reservation after reservation has been used");
+
+    if (!reserve(nBytes)) {
+      return false;
+    }
+
+    this.nBytes += nBytes;
+    return true;
+  }
+
+  /**
+   * Requests a reservation of additional space.
+   *
+   * <p>The implementation of the allocator's inner class provides this.
+   *
+   * @param nBytes the amount to reserve
+   * @return true if the reservation can be satisfied, false otherwise
+   */
+  protected abstract boolean reserve(int nBytes);
+
+  /**
+   * Allocate a buffer whose size is the total of all the add()s made.
+   *
+   * <p>The allocation request can still fail, even if the amount of space
+   * requested is available, if the allocation cannot be made contiguously.
+   *
+   * @return the buffer, or null, if the request cannot be satisfied
+   * @throws IllegalStateException if called called more than once
+   */
+  public DrillBuf buffer() {
+    Preconditions.checkState(!closed, "Attempt to allocate after closed");
+    Preconditions.checkState(!used, "Attempt to allocate more than once");
+
+    final DrillBuf drillBuf = allocate(nBytes);
+    used = true;
+    return drillBuf;
+  }
+
+  /**
+   * Allocate the a buffer of the requested size.
+   *
+   * <p>The implementation of the allocator's inner class provides this.
+   *
+   * @param nBytes the size of the buffer requested
+   * @return the buffer, or null, if the request cannot be satisfied
+   */
+  protected abstract DrillBuf allocate(int nBytes);
+
+  @Override
+  public void close() {
+    if (closed) {
+      return;
+    }
+    if (!used) {
+      releaseReservation(nBytes);
+    }
+
+    closed = true;
+  }
+
+  /**
+   * Return the reservation back to the allocator without having used it.
+   *
+   * @param nBytes the size of the reservation
+   */
+  protected abstract void releaseReservation(int nBytes);
+
+  /**
+   * Get the current size of the reservation (the sum of all the add()s).
+   *
+   * @return size of the current reservation
+   */
+  public int getSize() {
+    return nBytes;
+  }
+
+  /**
+   * Return whether or not the reservation has been used.
+   *
+   * @return whether or not the reservation has been used
+   */
+  public boolean isUsed() {
+    return used;
+  }
+
+  /**
+   * Return whether or not the reservation has been closed.
+   *
+   * @return whether or not the reservation has been closed
+   */
+  public boolean isClosed() {
+    return closed;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java
new file mode 100644
index 0000000..8bf2a99
--- /dev/null
+++ b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+/**
+ * Exception thrown when a closed BufferAllocator is used. Note
+ * this is an unchecked exception.
+ *
+ * @param message string associated with the cause
+ */
+@SuppressWarnings("serial")
+public class AllocatorClosedException extends RuntimeException {
+  public AllocatorClosedException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorOwner.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorOwner.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorOwner.java
new file mode 100644
index 0000000..f2d3df9
--- /dev/null
+++ b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorOwner.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.testing.ExecutionControls;
+
+/**
+ * This interface provides a means for allocator owners to inject services
+ * required by allocators, as well as to identify themselves for debugging purposes.
+ * Identification is done by overriding the implementation of
+ * {#link {@link Object#toString()}.
+ */
+public interface AllocatorOwner {
+  /**
+   * Get the current ExecutionControls from the allocator's owner.
+   *
+   * @return the current execution controls; may return null if this isn't
+   *   possible
+   */
+  ExecutionControls getExecutionControls();
+
+  @Deprecated // Only for TopLevelAllocator and its friends.
+  FragmentContext getFragmentContext();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorsStatsMXBean.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorsStatsMXBean.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorsStatsMXBean.java
new file mode 100644
index 0000000..00d8c4f
--- /dev/null
+++ b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorsStatsMXBean.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+/**
+ * JMX bean interface for global allocator statistics.
+ */
+// TODO use Stats infrastructure instead of JMX beans
+public interface AllocatorsStatsMXBean {
+  /**
+   * Get the maximum amount of direct memory that can be used.
+   *
+   * <p>This is determined by what is available, or by the drillbit
+   * configuration, if it specifies a value.</p>
+   *
+   * @return the amount of direct memory that can be used
+   */
+  public long getMaxDirectMemory();
+}