You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/12/22 16:06:33 UTC

[10/13] drill git commit: DRILL-4134: Allocator Improvements

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/io/netty/buffer/FakeAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/FakeAllocator.java b/exec/memory/base/src/main/java/io/netty/buffer/FakeAllocator.java
deleted file mode 100644
index b8d0fb2..0000000
--- a/exec/memory/base/src/main/java/io/netty/buffer/FakeAllocator.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.netty.buffer;
-
-import org.apache.drill.exec.memory.Accountor;
-import org.apache.drill.exec.memory.AllocationReservation;
-import org.apache.drill.exec.memory.AllocatorOwner;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.util.Pointer;
-
-class FakeAllocator implements BufferAllocator {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FakeAllocator.class);
-
-
-  public static final Accountor FAKE_ACCOUNTOR = new FakeAccountor();
-  public static final BufferAllocator FAKE_ALLOCATOR = new FakeAllocator();
-
-  @Override
-  public DrillBuf buffer(int size) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public DrillBuf buffer(int minSize, int maxSize) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public ByteBufAllocator getUnderlyingAllocator() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation, long maximumReservation,
-                                           boolean applyFragmentLimit)
-      throws OutOfMemoryException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public DrillBuf getEmpty() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean takeOwnership(DrillBuf buf) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void setFragmentLimit(long l) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getFragmentLimit(){
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void close() {
-  }
-
-  @Override
-  public long getAllocatedMemory() {
-    return 0;
-  }
-
-  @Override
-  public long getPeakMemoryAllocation() {
-    return 0;
-  }
-
-  static class FakeAccountor extends Accountor {
-
-    public FakeAccountor() {
-      super(null, false, null, null, 0, 0, true);
-    }
-
-    @Override
-    public boolean transferTo(Accountor target, DrillBuf buf, long size) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public long getAvailable() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public long getCapacity() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public long getAllocation() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean reserve(long size) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean forceAdditionalReservation(long size) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void reserved(long expected, DrillBuf buf) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void releasePartial(DrillBuf buf, long size) {
-
-    }
-
-    @Override
-    public void release(DrillBuf buf, long size) {
-
-    }
-
-    @Override
-    public void close() {
-
-    }
-  }
-
-  @Override
-  public BufferAllocator newChildAllocator(AllocatorOwner allocatorOwner,
-      long initReservation, long maxAllocation, int flags) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean shareOwnership(DrillBuf buf, Pointer<DrillBuf> bufOut) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int getId() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public AllocationReservation newReservation() {
-    throw new UnsupportedOperationException();
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/io/netty/buffer/LargeBuffer.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/LargeBuffer.java b/exec/memory/base/src/main/java/io/netty/buffer/LargeBuffer.java
index f1d4842..5f5e904 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/LargeBuffer.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/LargeBuffer.java
@@ -17,151 +17,23 @@
  */
 package io.netty.buffer;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.GatheringByteChannel;
-import java.nio.channels.ScatteringByteChannel;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * This is basically a complete copy of DuplicatedByteBuf. We copy because we can't override the release methods to keep
- * global track of created Large Buffers.
+ * A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and counts.
  */
-public class LargeBuffer extends AbstractByteBuf {
+public class LargeBuffer extends MutableWrappedByteBuf {
 
   private final AtomicLong hugeBufferSize;
   private final AtomicLong hugeBufferCount;
 
-  @Override
-  public ByteBuffer nioBuffer(int index, int length) {
-    return unwrap().nioBuffer(index, length);
-  }
-
-  private final ByteBuf buffer;
   private final int initCap;
 
   public LargeBuffer(ByteBuf buffer, AtomicLong hugeBufferSize, AtomicLong hugeBufferCount) {
-    super(buffer.maxCapacity());
+    super(buffer);
     initCap = buffer.capacity();
     this.hugeBufferCount = hugeBufferCount;
     this.hugeBufferSize = hugeBufferSize;
-
-    if (buffer instanceof LargeBuffer) {
-      this.buffer = ((LargeBuffer) buffer).buffer;
-    } else {
-      this.buffer = buffer;
-    }
-
-    setIndex(buffer.readerIndex(), buffer.writerIndex());
-  }
-
-  @Override
-  public ByteBuf unwrap() {
-    return buffer;
-  }
-
-  @Override
-  public ByteBufAllocator alloc() {
-    return buffer.alloc();
-  }
-
-  @Override
-  public ByteOrder order() {
-    return buffer.order();
-  }
-
-  @Override
-  public boolean isDirect() {
-    return buffer.isDirect();
-  }
-
-  @Override
-  public int capacity() {
-    return buffer.capacity();
-  }
-
-  @Override
-  public ByteBuf capacity(int newCapacity) {
-    buffer.capacity(newCapacity);
-    return this;
-  }
-
-  @Override
-  public boolean hasArray() {
-    return buffer.hasArray();
-  }
-
-  @Override
-  public byte[] array() {
-    return buffer.array();
-  }
-
-  @Override
-  public int arrayOffset() {
-    return buffer.arrayOffset();
-  }
-
-  @Override
-  public boolean hasMemoryAddress() {
-    return buffer.hasMemoryAddress();
-  }
-
-  @Override
-  public long memoryAddress() {
-    return buffer.memoryAddress();
-  }
-
-  @Override
-  public byte getByte(int index) {
-    return _getByte(index);
-  }
-
-  @Override
-  protected byte _getByte(int index) {
-    return buffer.getByte(index);
-  }
-
-  @Override
-  public short getShort(int index) {
-    return _getShort(index);
-  }
-
-  @Override
-  protected short _getShort(int index) {
-    return buffer.getShort(index);
-  }
-
-  @Override
-  public int getUnsignedMedium(int index) {
-    return _getUnsignedMedium(index);
-  }
-
-  @Override
-  protected int _getUnsignedMedium(int index) {
-    return buffer.getUnsignedMedium(index);
-  }
-
-  @Override
-  public int getInt(int index) {
-    return _getInt(index);
-  }
-
-  @Override
-  protected int _getInt(int index) {
-    return buffer.getInt(index);
-  }
-
-  @Override
-  public long getLong(int index) {
-    return _getLong(index);
-  }
-
-  @Override
-  protected long _getLong(int index) {
-    return buffer.getLong(index);
   }
 
   @Override
@@ -170,169 +42,6 @@ public class LargeBuffer extends AbstractByteBuf {
   }
 
   @Override
-  public ByteBuf slice(int index, int length) {
-    return new SlicedByteBuf(this, index, length);
-  }
-
-  @Override
-  public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
-    buffer.getBytes(index, dst, dstIndex, length);
-    return this;
-  }
-
-  @Override
-  public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
-    buffer.getBytes(index, dst, dstIndex, length);
-    return this;
-  }
-
-  @Override
-  public ByteBuf getBytes(int index, ByteBuffer dst) {
-    buffer.getBytes(index, dst);
-    return this;
-  }
-
-  @Override
-  public ByteBuf setByte(int index, int value) {
-    _setByte(index, value);
-    return this;
-  }
-
-  @Override
-  protected void _setByte(int index, int value) {
-    buffer.setByte(index, value);
-  }
-
-  @Override
-  public ByteBuf setShort(int index, int value) {
-    _setShort(index, value);
-    return this;
-  }
-
-  @Override
-  protected void _setShort(int index, int value) {
-    buffer.setShort(index, value);
-  }
-
-  @Override
-  public ByteBuf setMedium(int index, int value) {
-    _setMedium(index, value);
-    return this;
-  }
-
-  @Override
-  protected void _setMedium(int index, int value) {
-    buffer.setMedium(index, value);
-  }
-
-  @Override
-  public ByteBuf setInt(int index, int value) {
-    _setInt(index, value);
-    return this;
-  }
-
-  @Override
-  protected void _setInt(int index, int value) {
-    buffer.setInt(index, value);
-  }
-
-  @Override
-  public ByteBuf setLong(int index, long value) {
-    _setLong(index, value);
-    return this;
-  }
-
-  @Override
-  protected void _setLong(int index, long value) {
-    buffer.setLong(index, value);
-  }
-
-  @Override
-  public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
-    buffer.setBytes(index, src, srcIndex, length);
-    return this;
-  }
-
-  @Override
-  public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
-    buffer.setBytes(index, src, srcIndex, length);
-    return this;
-  }
-
-  @Override
-  public ByteBuf setBytes(int index, ByteBuffer src) {
-    buffer.setBytes(index, src);
-    return this;
-  }
-
-  @Override
-  public ByteBuf getBytes(int index, OutputStream out, int length)
-      throws IOException {
-    buffer.getBytes(index, out, length);
-    return this;
-  }
-
-  @Override
-  public int getBytes(int index, GatheringByteChannel out, int length)
-      throws IOException {
-    return buffer.getBytes(index, out, length);
-  }
-
-  @Override
-  public int setBytes(int index, InputStream in, int length)
-      throws IOException {
-    return buffer.setBytes(index, in, length);
-  }
-
-  @Override
-  public int setBytes(int index, ScatteringByteChannel in, int length)
-      throws IOException {
-    return buffer.setBytes(index, in, length);
-  }
-
-  @Override
-  public int nioBufferCount() {
-    return buffer.nioBufferCount();
-  }
-
-  @Override
-  public ByteBuffer[] nioBuffers(int index, int length) {
-    return buffer.nioBuffers(index, length);
-  }
-
-  @Override
-  public ByteBuffer internalNioBuffer(int index, int length) {
-    return nioBuffer(index, length);
-  }
-
-  @Override
-  public int forEachByte(int index, int length, ByteBufProcessor processor) {
-    return buffer.forEachByte(index, length, processor);
-  }
-
-  @Override
-  public int forEachByteDesc(int index, int length, ByteBufProcessor processor) {
-    return buffer.forEachByteDesc(index, length, processor);
-  }
-
-  @Override
-  public final int refCnt() {
-    return unwrap().refCnt();
-  }
-
-  @Override
-  public final ByteBuf retain() {
-    unwrap().retain();
-    return this;
-  }
-
-  @Override
-  public final ByteBuf retain(int increment) {
-    unwrap().retain(increment);
-    return this;
-  }
-
-  @Override
   public boolean release() {
     return release(1);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
new file mode 100644
index 0000000..5709473
--- /dev/null
+++ b/exec/memory/base/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.netty.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+
+/**
+ * This is basically a complete copy of DuplicatedByteBuf. We copy because we want to override some behaviors and make
+ * buffer mutable.
+ */
+abstract class MutableWrappedByteBuf extends AbstractByteBuf {
+
+  @Override
+  public ByteBuffer nioBuffer(int index, int length) {
+    return unwrap().nioBuffer(index, length);
+  }
+
+  ByteBuf buffer;
+
+  public MutableWrappedByteBuf(ByteBuf buffer) {
+    super(buffer.maxCapacity());
+
+    if (buffer instanceof MutableWrappedByteBuf) {
+      this.buffer = ((MutableWrappedByteBuf) buffer).buffer;
+    } else {
+      this.buffer = buffer;
+    }
+
+    setIndex(buffer.readerIndex(), buffer.writerIndex());
+  }
+
+  @Override
+  public ByteBuf unwrap() {
+    return buffer;
+  }
+
+  @Override
+  public ByteBufAllocator alloc() {
+    return buffer.alloc();
+  }
+
+  @Override
+  public ByteOrder order() {
+    return buffer.order();
+  }
+
+  @Override
+  public boolean isDirect() {
+    return buffer.isDirect();
+  }
+
+  @Override
+  public int capacity() {
+    return buffer.capacity();
+  }
+
+  @Override
+  public ByteBuf capacity(int newCapacity) {
+    buffer.capacity(newCapacity);
+    return this;
+  }
+
+  @Override
+  public boolean hasArray() {
+    return buffer.hasArray();
+  }
+
+  @Override
+  public byte[] array() {
+    return buffer.array();
+  }
+
+  @Override
+  public int arrayOffset() {
+    return buffer.arrayOffset();
+  }
+
+  @Override
+  public boolean hasMemoryAddress() {
+    return buffer.hasMemoryAddress();
+  }
+
+  @Override
+  public long memoryAddress() {
+    return buffer.memoryAddress();
+  }
+
+  @Override
+  public byte getByte(int index) {
+    return _getByte(index);
+  }
+
+  @Override
+  protected byte _getByte(int index) {
+    return buffer.getByte(index);
+  }
+
+  @Override
+  public short getShort(int index) {
+    return _getShort(index);
+  }
+
+  @Override
+  protected short _getShort(int index) {
+    return buffer.getShort(index);
+  }
+
+  @Override
+  public int getUnsignedMedium(int index) {
+    return _getUnsignedMedium(index);
+  }
+
+  @Override
+  protected int _getUnsignedMedium(int index) {
+    return buffer.getUnsignedMedium(index);
+  }
+
+  @Override
+  public int getInt(int index) {
+    return _getInt(index);
+  }
+
+  @Override
+  protected int _getInt(int index) {
+    return buffer.getInt(index);
+  }
+
+  @Override
+  public long getLong(int index) {
+    return _getLong(index);
+  }
+
+  @Override
+  protected long _getLong(int index) {
+    return buffer.getLong(index);
+  }
+
+  @Override
+  public abstract ByteBuf copy(int index, int length);
+
+  @Override
+  public ByteBuf slice(int index, int length) {
+    return new SlicedByteBuf(this, index, length);
+  }
+
+  @Override
+  public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+    buffer.getBytes(index, dst, dstIndex, length);
+    return this;
+  }
+
+  @Override
+  public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+    buffer.getBytes(index, dst, dstIndex, length);
+    return this;
+  }
+
+  @Override
+  public ByteBuf getBytes(int index, ByteBuffer dst) {
+    buffer.getBytes(index, dst);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setByte(int index, int value) {
+    _setByte(index, value);
+    return this;
+  }
+
+  @Override
+  protected void _setByte(int index, int value) {
+    buffer.setByte(index, value);
+  }
+
+  @Override
+  public ByteBuf setShort(int index, int value) {
+    _setShort(index, value);
+    return this;
+  }
+
+  @Override
+  protected void _setShort(int index, int value) {
+    buffer.setShort(index, value);
+  }
+
+  @Override
+  public ByteBuf setMedium(int index, int value) {
+    _setMedium(index, value);
+    return this;
+  }
+
+  @Override
+  protected void _setMedium(int index, int value) {
+    buffer.setMedium(index, value);
+  }
+
+  @Override
+  public ByteBuf setInt(int index, int value) {
+    _setInt(index, value);
+    return this;
+  }
+
+  @Override
+  protected void _setInt(int index, int value) {
+    buffer.setInt(index, value);
+  }
+
+  @Override
+  public ByteBuf setLong(int index, long value) {
+    _setLong(index, value);
+    return this;
+  }
+
+  @Override
+  protected void _setLong(int index, long value) {
+    buffer.setLong(index, value);
+  }
+
+  @Override
+  public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+    buffer.setBytes(index, src, srcIndex, length);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+    buffer.setBytes(index, src, srcIndex, length);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setBytes(int index, ByteBuffer src) {
+    buffer.setBytes(index, src);
+    return this;
+  }
+
+  @Override
+  public ByteBuf getBytes(int index, OutputStream out, int length)
+      throws IOException {
+    buffer.getBytes(index, out, length);
+    return this;
+  }
+
+  @Override
+  public int getBytes(int index, GatheringByteChannel out, int length)
+      throws IOException {
+    return buffer.getBytes(index, out, length);
+  }
+
+  @Override
+  public int setBytes(int index, InputStream in, int length)
+      throws IOException {
+    return buffer.setBytes(index, in, length);
+  }
+
+  @Override
+  public int setBytes(int index, ScatteringByteChannel in, int length)
+      throws IOException {
+    return buffer.setBytes(index, in, length);
+  }
+
+  @Override
+  public int nioBufferCount() {
+    return buffer.nioBufferCount();
+  }
+
+  @Override
+  public ByteBuffer[] nioBuffers(int index, int length) {
+    return buffer.nioBuffers(index, length);
+  }
+
+  @Override
+  public ByteBuffer internalNioBuffer(int index, int length) {
+    return nioBuffer(index, length);
+  }
+
+  @Override
+  public int forEachByte(int index, int length, ByteBufProcessor processor) {
+    return buffer.forEachByte(index, length, processor);
+  }
+
+  @Override
+  public int forEachByteDesc(int index, int length, ByteBufProcessor processor) {
+    return buffer.forEachByteDesc(index, length, processor);
+  }
+
+  @Override
+  public final int refCnt() {
+    return unwrap().refCnt();
+  }
+
+  @Override
+  public final ByteBuf retain() {
+    unwrap().retain();
+    return this;
+  }
+
+  @Override
+  public final ByteBuf retain(int increment) {
+    unwrap().retain(increment);
+    return this;
+  }
+
+  @Override
+  public boolean release() {
+    return release(1);
+  }
+
+  @Override
+  public boolean release(int decrement) {
+    boolean released = unwrap().release(decrement);
+    return released;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
index 2fc1bd0..47dbf59 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
@@ -23,193 +23,227 @@ import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.drill.exec.exception.OutOfMemoryException;
+
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricFilter;
 import com.codahale.metrics.MetricRegistry;
 
-public class PooledByteBufAllocatorL extends PooledByteBufAllocator{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PooledByteBufAllocatorL.class);
-
+/**
+ * The base allocator that we use for all of Drill's memory management. Returns UnsafeDirectLittleEndian buffers.
+ */
+public class PooledByteBufAllocatorL {
   private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("drill.allocator");
+
   private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
 
+
   private static final String METRIC_PREFIX = "drill.allocator.";
+
   private final MetricRegistry registry;
   private final AtomicLong hugeBufferSize = new AtomicLong(0);
   private final AtomicLong hugeBufferCount = new AtomicLong(0);
   private final AtomicLong normalBufferSize = new AtomicLong(0);
   private final AtomicLong normalBufferCount = new AtomicLong(0);
 
-  private final PoolArena<ByteBuffer>[] directArenas;
-  private final MemoryStatusThread statusThread;
-  private final Histogram largeBuffersHist;
-  private final Histogram normalBuffersHist;
+  public final InnerAllocator allocator;
+  public final UnsafeDirectLittleEndian empty;
 
   public PooledByteBufAllocatorL(MetricRegistry registry) {
-    super(true);
     this.registry = registry;
+    allocator = new InnerAllocator();
+    empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER));
+  }
+
+  public UnsafeDirectLittleEndian allocate(int size) {
     try {
-      Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas");
-      f.setAccessible(true);
-      this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
-    } catch (Exception e) {
-      throw new RuntimeException("Failure while initializing allocator.  Unable to retrieve direct arenas field.", e);
+      return allocator.directBuffer(size, size);
+    } catch (OutOfMemoryError e) {
+      throw new OutOfMemoryException("Failure allocating buffer.", e);
     }
 
-    if (memoryLogger.isTraceEnabled()) {
-      statusThread = new MemoryStatusThread();
-      statusThread.start();
-    } else {
-      statusThread = null;
-    }
-    removeOldMetrics();
+  }
 
-    registry.register(METRIC_PREFIX + "normal.size", new Gauge<Long>() {
-      @Override
-      public Long getValue() {
-        return normalBufferSize.get();
-      }
-    });
+  public int getChunkSize() {
+    return allocator.chunkSize;
+  }
 
-    registry.register(METRIC_PREFIX + "normal.count", new Gauge<Long>() {
-      @Override
-      public Long getValue() {
-        return normalBufferCount.get();
-      }
-    });
+  private class InnerAllocator extends PooledByteBufAllocator {
 
-    registry.register(METRIC_PREFIX + "huge.size", new Gauge<Long>() {
-      @Override
-      public Long getValue() {
-        return hugeBufferSize.get();
-      }
-    });
 
-    registry.register(METRIC_PREFIX + "huge.count", new Gauge<Long>() {
-      @Override
-      public Long getValue() {
-        return hugeBufferCount.get();
-      }
-    });
+    private final PoolArena<ByteBuffer>[] directArenas;
+    private final MemoryStatusThread statusThread;
+    private final Histogram largeBuffersHist;
+    private final Histogram normalBuffersHist;
+    private final int chunkSize;
 
-    largeBuffersHist = registry.histogram(METRIC_PREFIX + "huge.hist");
-    normalBuffersHist = registry.histogram(METRIC_PREFIX + "normal.hist");
+    public InnerAllocator() {
+      super(true);
 
-  }
+      try {
+        Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas");
+        f.setAccessible(true);
+        this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
+      } catch (Exception e) {
+        throw new RuntimeException("Failure while initializing allocator.  Unable to retrieve direct arenas field.", e);
+      }
 
-  private synchronized void removeOldMetrics() {
-    registry.removeMatching(new MetricFilter() {
-      @Override
-      public boolean matches(String name, Metric metric) {
-        return name.startsWith("drill.allocator.");
+      this.chunkSize = directArenas[0].chunkSize;
+
+      if (memoryLogger.isTraceEnabled()) {
+        statusThread = new MemoryStatusThread();
+        statusThread.start();
+      } else {
+        statusThread = null;
       }
+      removeOldMetrics();
 
-    });
-  }
+      registry.register(METRIC_PREFIX + "normal.size", new Gauge<Long>() {
+        @Override
+        public Long getValue() {
+          return normalBufferSize.get();
+        }
+      });
 
-  @Override
-  protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
-    throw new UnsupportedOperationException("Drill doesn't support using heap buffers.");
-  }
+      registry.register(METRIC_PREFIX + "normal.count", new Gauge<Long>() {
+        @Override
+        public Long getValue() {
+          return normalBufferCount.get();
+        }
+      });
 
-  @Override
-  protected UnsafeDirectLittleEndian newDirectBuffer(int initialCapacity, int maxCapacity) {
-    PoolThreadCache cache = threadCache.get();
-    PoolArena<ByteBuffer> directArena = cache.directArena;
+      registry.register(METRIC_PREFIX + "huge.size", new Gauge<Long>() {
+        @Override
+        public Long getValue() {
+          return hugeBufferSize.get();
+        }
+      });
 
-    if (directArena != null) {
+      registry.register(METRIC_PREFIX + "huge.count", new Gauge<Long>() {
+        @Override
+        public Long getValue() {
+          return hugeBufferCount.get();
+        }
+      });
 
-      if (initialCapacity > directArena.chunkSize) {
-        // This is beyond chunk size so we'll allocate separately.
-        ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);
+      largeBuffersHist = registry.histogram(METRIC_PREFIX + "huge.hist");
+      normalBuffersHist = registry.histogram(METRIC_PREFIX + "normal.hist");
 
-        hugeBufferCount.incrementAndGet();
-        hugeBufferSize.addAndGet(buf.capacity());
-        largeBuffersHist.update(buf.capacity());
-        // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
-        return new UnsafeDirectLittleEndian(new LargeBuffer(buf, hugeBufferSize, hugeBufferCount));
+    }
 
-      } else {
-        // within chunk, use arena.
-        ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
-        if (!(buf instanceof PooledUnsafeDirectByteBuf)) {
-          fail();
+
+    private synchronized void removeOldMetrics() {
+      registry.removeMatching(new MetricFilter() {
+        @Override
+        public boolean matches(String name, Metric metric) {
+          return name.startsWith("drill.allocator.");
         }
 
-        normalBuffersHist.update(buf.capacity());
-        if (ASSERT_ENABLED) {
-          normalBufferSize.addAndGet(buf.capacity());
-          normalBufferCount.incrementAndGet();
+      });
+    }
+
+    private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity) {
+      PoolThreadCache cache = threadCache.get();
+      PoolArena<ByteBuffer> directArena = cache.directArena;
+
+      if (directArena != null) {
+
+        if (initialCapacity > directArena.chunkSize) {
+          // This is beyond chunk size so we'll allocate separately.
+          ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);
+
+          hugeBufferCount.incrementAndGet();
+          hugeBufferSize.addAndGet(buf.capacity());
+          largeBuffersHist.update(buf.capacity());
+          // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
+          return new UnsafeDirectLittleEndian(new LargeBuffer(buf, hugeBufferSize, hugeBufferCount));
+
+        } else {
+          // within chunk, use arena.
+          ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
+          if (!(buf instanceof PooledUnsafeDirectByteBuf)) {
+            fail();
+          }
+
+          normalBuffersHist.update(buf.capacity());
+          if (ASSERT_ENABLED) {
+            normalBufferSize.addAndGet(buf.capacity());
+            normalBufferCount.incrementAndGet();
+          }
+
+          return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount,
+              normalBufferSize);
         }
 
-        return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount, normalBufferSize);
+      } else {
+        throw fail();
       }
-
-    } else {
-      throw fail();
     }
-  }
-
-  private UnsupportedOperationException fail() {
-    return new UnsupportedOperationException(
-        "Drill requries that the JVM used supports access sun.misc.Unsafe.  This platform didn't provide that functionality.");
-  }
 
+    private UnsupportedOperationException fail() {
+      return new UnsupportedOperationException(
+          "Drill requries that the JVM used supports access sun.misc.Unsafe.  This platform didn't provide that functionality.");
+    }
 
-  @Override
-  public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) {
+    public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) {
       if (initialCapacity == 0 && maxCapacity == 0) {
-          newDirectBuffer(initialCapacity, maxCapacity);
+        newDirectBuffer(initialCapacity, maxCapacity);
       }
       validate(initialCapacity, maxCapacity);
-      return newDirectBuffer(initialCapacity, maxCapacity);
-  }
+      return newDirectBufferL(initialCapacity, maxCapacity);
+    }
 
-  @Override
-  public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
-    throw new UnsupportedOperationException("Drill doesn't support using heap buffers.");
-  }
+    @Override
+    public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
+      throw new UnsupportedOperationException("Drill doesn't support using heap buffers.");
+    }
 
 
-  private static void validate(int initialCapacity, int maxCapacity) {
-    if (initialCapacity < 0) {
+    private void validate(int initialCapacity, int maxCapacity) {
+      if (initialCapacity < 0) {
         throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expectd: 0+)");
-    }
-    if (initialCapacity > maxCapacity) {
+      }
+      if (initialCapacity > maxCapacity) {
         throw new IllegalArgumentException(String.format(
-                "initialCapacity: %d (expected: not greater than maxCapacity(%d)",
-                initialCapacity, maxCapacity));
+            "initialCapacity: %d (expected: not greater than maxCapacity(%d)",
+            initialCapacity, maxCapacity));
+      }
     }
-  }
 
-  private class MemoryStatusThread extends Thread {
+    private class MemoryStatusThread extends Thread {
 
-    public MemoryStatusThread() {
-      super("memory-status-logger");
-      this.setDaemon(true);
-      this.setName("allocation.logger");
-    }
+      public MemoryStatusThread() {
+        super("memory-status-logger");
+        this.setDaemon(true);
+        this.setName("allocation.logger");
+      }
 
-    @Override
-    public void run() {
-      while (true) {
-        memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString());
-        try {
-          Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
-        } catch (InterruptedException e) {
-          return;
-        }
+      @Override
+      public void run() {
+        while (true) {
+          memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString());
+          try {
+            Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
+          } catch (InterruptedException e) {
+            return;
+          }
 
+        }
       }
-    }
 
-  }
+    }
 
-  public void checkAndReset() {
-    if (hugeBufferCount.get() != 0 || normalBufferCount.get() != 0) {
+    public String toString() {
       StringBuilder buf = new StringBuilder();
+      buf.append(directArenas.length);
+      buf.append(" direct arena(s):");
+      buf.append(StringUtil.NEWLINE);
+      for (PoolArena<ByteBuffer> a : directArenas) {
+        buf.append(a);
+      }
+
       buf.append("Large buffers outstanding: ");
       buf.append(hugeBufferCount.get());
       buf.append(" totaling ");
@@ -221,35 +255,10 @@ public class PooledByteBufAllocatorL extends PooledByteBufAllocator{
       buf.append(" totaling ");
       buf.append(normalBufferSize.get());
       buf.append(" bytes.");
-      hugeBufferCount.set(0);
-      normalBufferCount.set(0);
-      hugeBufferSize.set(0);
-      normalBufferSize.set(0);
-      throw new IllegalStateException(buf.toString());
+      return buf.toString();
     }
-  }
 
-  public String toString() {
-    StringBuilder buf = new StringBuilder();
-    buf.append(directArenas.length);
-    buf.append(" direct arena(s):");
-    buf.append(StringUtil.NEWLINE);
-    for (PoolArena<ByteBuffer> a : directArenas) {
-      buf.append(a);
-    }
 
-    buf.append("Large buffers outstanding: ");
-    buf.append(this.hugeBufferCount.get());
-    buf.append(" totaling ");
-    buf.append(this.hugeBufferSize.get());
-    buf.append(" bytes.");
-    buf.append('\n');
-    buf.append("Normal buffers outstanding: ");
-    buf.append(this.normalBufferCount.get());
-    buf.append(" totaling ");
-    buf.append(this.normalBufferSize.get());
-    buf.append(" bytes.");
-    return buf.toString();
   }
 
   public static final boolean ASSERT_ENABLED;
@@ -259,4 +268,5 @@ public class PooledByteBufAllocatorL extends PooledByteBufAllocator{
     assert isAssertEnabled = true;
     ASSERT_ENABLED = isAssertEnabled;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
index 419aef3..12e9907 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
@@ -23,92 +23,46 @@ import io.netty.util.internal.PlatformDependent;
 import java.nio.ByteOrder;
 import java.util.concurrent.atomic.AtomicLong;
 
+/**
+ * The underlying class we use for little-endian access to memory. Is used underneath DrillBufs to abstract away the
+ * Netty classes and underlying Netty memory management.
+ */
 public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
   private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
   private final AbstractByteBuf wrapped;
   private final long memoryAddress;
-  private static final boolean TRACK_BUFFERS = false;
-  private AtomicLong bufferCount;
-  private AtomicLong bufferSize;
-  private long initCap = -1;
 
-  private final static IdentityHashMap<UnsafeDirectLittleEndian, StackTrace> bufferMap = new IdentityHashMap<>();
+  private final AtomicLong bufferCount;
+  private final AtomicLong bufferSize;
+  private final long initCap;
 
-  @Override
-  public boolean release() {
-    return release(1);
+  UnsafeDirectLittleEndian(DuplicatedByteBuf buf) {
+    this(buf, true, null, null);
   }
 
-  @Override
-  public boolean release(int decrement) {
-    boolean released = super.release(decrement);
-    if (TRACK_BUFFERS) {
-      if (released) {
-        final Object object;
-        synchronized (bufferMap) {
-          object = bufferMap.remove(this);
-        }
-        if (object == null) {
-          throw new IllegalStateException("no such buffer");
-        }
-
-        if (initCap != -1) {
-          bufferCount.decrementAndGet();
-          bufferSize.addAndGet(-initCap);
-        }
-      }
-    }
-
-    return released;
+  UnsafeDirectLittleEndian(LargeBuffer buf) {
+    this(buf, true, null, null);
   }
 
+  UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong bufferCount, AtomicLong bufferSize) {
+    this(buf, true, bufferCount, bufferSize);
 
-  public static int getBufferCount() {
-    return bufferMap.size();
-  }
-
-  public static void releaseBuffers() {
-    synchronized(bufferMap) {
-      final Set<UnsafeDirectLittleEndian> bufferSet = bufferMap.keySet();
-      final LinkedList<UnsafeDirectLittleEndian> bufferList = new LinkedList<>(bufferSet);
-      while(!bufferList.isEmpty()) {
-        final UnsafeDirectLittleEndian udle = bufferList.removeFirst();
-        udle.release(udle.refCnt());
-      }
-    }
   }
 
-  public static void logBuffers(final Logger logger) {
-    synchronized (bufferMap) {
-      int count = 0;
-      final Set<UnsafeDirectLittleEndian> bufferSet = bufferMap.keySet();
-      for (final UnsafeDirectLittleEndian udle : bufferSet) {
-        final StackTrace stackTrace = bufferMap.get(udle);
-        ++count;
-        logger.debug("#" + count + " active buffer allocated at\n" + stackTrace);
-      }
+  private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake, AtomicLong bufferCount, AtomicLong bufferSize) {
+    super(buf);
+    if (!NATIVE_ORDER || buf.order() != ByteOrder.BIG_ENDIAN) {
+      throw new IllegalStateException("Drill only runs on LittleEndian systems.");
     }
-  }
-  UnsafeDirectLittleEndian(LargeBuffer buf) {
-    this(buf, true);
-  }
 
-  UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong bufferCount, AtomicLong bufferSize) {
-    this(buf, true);
     this.bufferCount = bufferCount;
     this.bufferSize = bufferSize;
 
     // initCap is used if we're tracking memory release. If we're in non-debug mode, we'll skip this.
-    this.initCap = ASSERT_ENABLED ? capacity() : -1;
-  }
+    this.initCap = ASSERT_ENABLED ? buf.capacity() : -1;
 
-  private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake) {
-    super(buf);
-    if (!NATIVE_ORDER || buf.order() != ByteOrder.BIG_ENDIAN) {
-      throw new IllegalStateException("Drill only runs on LittleEndian systems.");
-    }
-    wrapped = buf;
-    memoryAddress = buf.memoryAddress();
+    this.wrapped = buf;
+    this.memoryAddress = buf.memoryAddress();
   }
     private long addr(int index) {
         return memoryAddress + index;
@@ -147,142 +101,140 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
   }
 
   @Override
-    public double getDouble(int index) {
-        return Double.longBitsToDouble(getLong(index));
-    }
+  public double getDouble(int index) {
+    return Double.longBitsToDouble(getLong(index));
+  }
 
-    @Override
-    public char getChar(int index) {
-        return (char) getShort(index);
-    }
+  @Override
+  public char getChar(int index) {
+    return (char) getShort(index);
+  }
 
-    @Override
-    public long getUnsignedInt(int index) {
-        return getInt(index) & 0xFFFFFFFFL;
-    }
+  @Override
+  public long getUnsignedInt(int index) {
+    return getInt(index) & 0xFFFFFFFFL;
+  }
 
-    @Override
-    public int getInt(int index) {
-//        wrapped.checkIndex(index, 4);
-        int v = PlatformDependent.getInt(addr(index));
-        return v;
-    }
+  @Override
+  public int getInt(int index) {
+    int v = PlatformDependent.getInt(addr(index));
+    return v;
+  }
 
-    @Override
-    public int getUnsignedShort(int index) {
-        return getShort(index) & 0xFFFF;
-    }
+  @Override
+  public int getUnsignedShort(int index) {
+    return getShort(index) & 0xFFFF;
+  }
 
-    @Override
-    public short getShort(int index) {
-//        wrapped.checkIndex(index, 2);
-        short v = PlatformDependent.getShort(addr(index));
-        return v;
-    }
+  @Override
+  public short getShort(int index) {
+    short v = PlatformDependent.getShort(addr(index));
+    return v;
+  }
 
-    @Override
-    public ByteBuf setShort(int index, int value) {
-        wrapped.checkIndex(index, 2);
-        _setShort(index, value);
-        return this;
-    }
+  @Override
+  public ByteBuf setShort(int index, int value) {
+    wrapped.checkIndex(index, 2);
+    _setShort(index, value);
+    return this;
+  }
 
-    @Override
-    public ByteBuf setInt(int index, int value) {
-        wrapped.checkIndex(index, 4);
-        _setInt(index, value);
-        return this;
-    }
+  @Override
+  public ByteBuf setInt(int index, int value) {
+    wrapped.checkIndex(index, 4);
+    _setInt(index, value);
+    return this;
+  }
 
-    @Override
-    public ByteBuf setLong(int index, long value) {
-        wrapped.checkIndex(index, 8);
-        _setLong(index, value);
-        return this;
-    }
+  @Override
+  public ByteBuf setLong(int index, long value) {
+    wrapped.checkIndex(index, 8);
+    _setLong(index, value);
+    return this;
+  }
 
-    @Override
-    public ByteBuf setChar(int index, int value) {
-        setShort(index, value);
-        return this;
-    }
+  @Override
+  public ByteBuf setChar(int index, int value) {
+    setShort(index, value);
+    return this;
+  }
 
-    @Override
-    public ByteBuf setFloat(int index, float value) {
-        setInt(index, Float.floatToRawIntBits(value));
-        return this;
-    }
+  @Override
+  public ByteBuf setFloat(int index, float value) {
+    setInt(index, Float.floatToRawIntBits(value));
+    return this;
+  }
 
-    @Override
-    public ByteBuf setDouble(int index, double value) {
-        setLong(index, Double.doubleToRawLongBits(value));
-        return this;
-    }
+  @Override
+  public ByteBuf setDouble(int index, double value) {
+    setLong(index, Double.doubleToRawLongBits(value));
+    return this;
+  }
 
-    @Override
-    public ByteBuf writeShort(int value) {
-        wrapped.ensureWritable(2);
-        _setShort(wrapped.writerIndex, value);
-        wrapped.writerIndex += 2;
-        return this;
-    }
+  @Override
+  public ByteBuf writeShort(int value) {
+    wrapped.ensureWritable(2);
+    _setShort(wrapped.writerIndex, value);
+    wrapped.writerIndex += 2;
+    return this;
+  }
 
-    @Override
-    public ByteBuf writeInt(int value) {
-        wrapped.ensureWritable(4);
-        _setInt(wrapped.writerIndex, value);
-        wrapped.writerIndex += 4;
-        return this;
-    }
+  @Override
+  public ByteBuf writeInt(int value) {
+    wrapped.ensureWritable(4);
+    _setInt(wrapped.writerIndex, value);
+    wrapped.writerIndex += 4;
+    return this;
+  }
 
-    @Override
-    public ByteBuf writeLong(long value) {
-        wrapped.ensureWritable(8);
-        _setLong(wrapped.writerIndex, value);
-        wrapped.writerIndex += 8;
-        return this;
-    }
+  @Override
+  public ByteBuf writeLong(long value) {
+    wrapped.ensureWritable(8);
+    _setLong(wrapped.writerIndex, value);
+    wrapped.writerIndex += 8;
+    return this;
+  }
 
-    @Override
-    public ByteBuf writeChar(int value) {
-        writeShort(value);
-        return this;
-    }
+  @Override
+  public ByteBuf writeChar(int value) {
+    writeShort(value);
+    return this;
+  }
 
-    @Override
-    public ByteBuf writeFloat(float value) {
-        writeInt(Float.floatToRawIntBits(value));
-        return this;
-    }
+  @Override
+  public ByteBuf writeFloat(float value) {
+    writeInt(Float.floatToRawIntBits(value));
+    return this;
+  }
 
-    @Override
-    public ByteBuf writeDouble(double value) {
-        writeLong(Double.doubleToRawLongBits(value));
-        return this;
-    }
+  @Override
+  public ByteBuf writeDouble(double value) {
+    writeLong(Double.doubleToRawLongBits(value));
+    return this;
+  }
 
-    private void _setShort(int index, int value) {
-        PlatformDependent.putShort(addr(index), (short) value);
-    }
+  private void _setShort(int index, int value) {
+    PlatformDependent.putShort(addr(index), (short) value);
+  }
 
-    private void _setInt(int index, int value) {
-        PlatformDependent.putInt(addr(index), value);
-    }
+  private void _setInt(int index, int value) {
+    PlatformDependent.putInt(addr(index), value);
+  }
 
-    private void _setLong(int index, long value) {
-        PlatformDependent.putLong(addr(index), value);
-    }
+  private void _setLong(int index, long value) {
+    PlatformDependent.putLong(addr(index), value);
+  }
 
-    @Override
-    public byte getByte(int index) {
-      return PlatformDependent.getByte(addr(index));
-    }
+  @Override
+  public byte getByte(int index) {
+    return PlatformDependent.getByte(addr(index));
+  }
 
-    @Override
-    public ByteBuf setByte(int index, int value) {
-      PlatformDependent.putByte(addr(index), (byte) value);
-      return this;
-    }
+  @Override
+  public ByteBuf setByte(int index, int value) {
+    PlatformDependent.putByte(addr(index), (byte) value);
+    return this;
+  }
 
   @Override
   public boolean release() {
@@ -291,8 +243,8 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
 
   @Override
   public boolean release(int decrement) {
-    boolean released = super.release(decrement);
-    if (released && initCap != -1) {
+    final boolean released = super.release(decrement);
+    if (ASSERT_ENABLED && released && bufferCount != null && bufferSize != null) {
       bufferCount.decrementAndGet();
       bufferSize.addAndGet(-initCap);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountant.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountant.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountant.java
new file mode 100644
index 0000000..8bcf6a0
--- /dev/null
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountant.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.apache.drill.exec.exception.OutOfMemoryException;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Provides a concurrent way to manage account for memory usage without locking. Used as basis for Allocators. All
+ * operations are threadsafe (except for close).
+ */
+@ThreadSafe
+class Accountant implements AutoCloseable {
+  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Accountant.class);
+
+  /**
+   * The parent allocator
+   */
+  protected final Accountant parent;
+
+  /**
+   * The amount of memory reserved for this allocator. Releases below this amount of memory will not be returned to the
+   * parent Accountant until this Accountant is closed.
+   */
+  protected final long reservation;
+
+  private final AtomicLong peakAllocation = new AtomicLong();
+
+  /**
+   * Maximum local memory that can be held. This can be externally updated. Changing it won't cause past memory to
+   * change but will change responses to future allocation efforts
+   */
+  private final AtomicLong allocationLimit = new AtomicLong();
+
+  /**
+   * Currently allocated amount of memory;
+   */
+  private final AtomicLong locallyHeldMemory = new AtomicLong();
+
+  public Accountant(Accountant parent, long reservation, long maxAllocation) {
+    Preconditions.checkArgument(reservation >= 0, "The initial reservation size must be non-negative.");
+    Preconditions.checkArgument(maxAllocation >= 0, "The maximum allocation limit must be non-negative.");
+    Preconditions.checkArgument(reservation <= maxAllocation,
+        "The initial reservation size must be <= the maximum allocation.");
+    Preconditions.checkArgument(reservation == 0 || parent != null, "The root accountant can't reserve memory.");
+
+    this.parent = parent;
+    this.reservation = reservation;
+    this.allocationLimit.set(maxAllocation);
+
+    if (reservation != 0) {
+      // we will allocate a reservation from our parent.
+      final AllocationOutcome outcome = parent.allocateBytes(reservation);
+      if (!outcome.isOk()) {
+        throw new OutOfMemoryException(String.format(
+            "Failure trying to allocate initial reservation for Allocator. "
+                + "Attempted to allocate %d bytes and received an outcome of %s.", reservation, outcome.name()));
+      }
+    }
+  }
+
+  /**
+   * Attempt to allocate the requested amount of memory. Either completely succeeds or completely fails. Constructs a a
+   * log of delta
+   *
+   * If it fails, no changes are made to accounting.
+   *
+   * @param size
+   *          The amount of memory to reserve in bytes.
+   * @return True if the allocation was successful, false if the allocation failed.
+   */
+  AllocationOutcome allocateBytes(long size) {
+    final AllocationOutcome outcome = allocate(size, true, false);
+    if (!outcome.isOk()) {
+      releaseBytes(size);
+    }
+    return outcome;
+  }
+
+  private void updatePeak() {
+    final long currentMemory = locallyHeldMemory.get();
+    while (true) {
+
+      final long previousPeak = peakAllocation.get();
+      if (currentMemory > previousPeak) {
+        if (!peakAllocation.compareAndSet(previousPeak, currentMemory)) {
+          // peak allocation changed underneath us. try again.
+          continue;
+        }
+      }
+
+      // we either succeeded to set peak allocation or we weren't above the previous peak, exit.
+      return;
+    }
+  }
+
+
+  /**
+   * Increase the accounting. Returns whether the allocation fit within limits.
+   *
+   * @param size
+   *          to increase
+   * @return Whether the allocation fit within limits.
+   */
+  boolean forceAllocate(long size) {
+    final AllocationOutcome outcome = allocate(size, true, true);
+    return outcome.isOk();
+  }
+
+  /**
+   * Internal method for allocation. This takes a forced approach to allocation to ensure that we manage reservation
+   * boundary issues consistently. Allocation is always done through the entire tree. The two options that we influence
+   * are whether the allocation should be forced and whether or not the peak memory allocation should be updated. If at
+   * some point during allocation escalation we determine that the allocation is no longer possible, we will continue to
+   * do a complete and consistent allocation but we will stop updating the peak allocation. We do this because we know
+   * that we will be directly unwinding this allocation (and thus never actually making the allocation). If force
+   * allocation is passed, then we continue to update the peak limits since we now know that this allocation will occur
+   * despite our moving past one or more limits.
+   *
+   * @param size
+   *          The size of the allocation.
+   * @param incomingUpdatePeak
+   *          Whether we should update the local peak for this allocation.
+   * @param forceAllocation
+   *          Whether we should force the allocation.
+   * @return The outcome of the allocation.
+   */
+  private AllocationOutcome allocate(final long size, final boolean incomingUpdatePeak, final boolean forceAllocation) {
+    final long newLocal = locallyHeldMemory.addAndGet(size);
+    final long beyondReservation = newLocal - reservation;
+    final boolean beyondLimit = newLocal > allocationLimit.get();
+    final boolean updatePeak = forceAllocation || (incomingUpdatePeak && !beyondLimit);
+
+    AllocationOutcome parentOutcome = AllocationOutcome.SUCCESS;
+    if (beyondReservation > 0 && parent != null) {
+      // we need to get memory from our parent.
+      final long parentRequest = Math.min(beyondReservation, size);
+      parentOutcome = parent.allocate(parentRequest, updatePeak, forceAllocation);
+    }
+
+    final AllocationOutcome finalOutcome = beyondLimit ? AllocationOutcome.FAILED_LOCAL :
+        parentOutcome.ok ? AllocationOutcome.SUCCESS : AllocationOutcome.FAILED_PARENT;
+
+    if (updatePeak) {
+      updatePeak();
+    }
+
+    return finalOutcome;
+  }
+
+  public void releaseBytes(long size) {
+    // reduce local memory. all memory released above reservation should be released up the tree.
+    final long newSize = locallyHeldMemory.addAndGet(-size);
+
+    Preconditions.checkArgument(newSize >= 0, "Accounted size went negative.");
+
+    final long originalSize = newSize + size;
+    if(originalSize > reservation && parent != null){
+      // we deallocated memory that we should release to our parent.
+      final long possibleAmountToReleaseToParent = originalSize - reservation;
+      final long actualToReleaseToParent = Math.min(size, possibleAmountToReleaseToParent);
+      parent.releaseBytes(actualToReleaseToParent);
+    }
+
+  }
+
+  /**
+   * Set the maximum amount of memory that can be allocated in the this Accountant before failing an allocation.
+   *
+   * @param newLimit
+   *          The limit in bytes.
+   */
+  public void setLimit(long newLimit) {
+    allocationLimit.set(newLimit);
+  }
+
+  public boolean isOverLimit() {
+    return getAllocatedMemory() > getLimit() || (parent != null && parent.isOverLimit());
+  }
+
+  /**
+   * Close this Accountant. This will release any reservation bytes back to a parent Accountant.
+   */
+  public void close() {
+    // return memory reservation to parent allocator.
+    if (parent != null) {
+      parent.releaseBytes(reservation);
+    }
+  }
+
+  /**
+   * Return the current limit of this Accountant.
+   *
+   * @return Limit in bytes.
+   */
+  public long getLimit() {
+    return allocationLimit.get();
+  }
+
+  /**
+   * Return the current amount of allocated memory that this Accountant is managing accounting for. Note this does not
+   * include reservation memory that hasn't been allocated.
+   *
+   * @return Currently allocate memory in bytes.
+   */
+  public long getAllocatedMemory() {
+    return locallyHeldMemory.get();
+  }
+
+  /**
+   * The peak memory allocated by this Accountant.
+   *
+   * @return The peak allocated memory in bytes.
+   */
+  public long getPeakMemoryAllocation() {
+    return peakAllocation.get();
+  }
+
+  /**
+   * Describes the type of outcome that occurred when trying to account for allocation of memory.
+   */
+  public static enum AllocationOutcome {
+
+    /**
+     * Allocation succeeded.
+     */
+    SUCCESS(true),
+
+    /**
+     * Allocation succeeded but only because the allocator was forced to move beyond a limit.
+     */
+    FORCED_SUCESS(true),
+
+    /**
+     * Allocation failed because the local allocator's limits were exceeded.
+     */
+    FAILED_LOCAL(false),
+
+    /**
+     * Allocation failed because a parent allocator's limits were exceeded.
+     */
+    FAILED_PARENT(false);
+
+    private final boolean ok;
+
+    AllocationOutcome(boolean ok) {
+      this.ok = ok;
+    }
+
+    public boolean isOk() {
+      return ok;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountor.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountor.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountor.java
deleted file mode 100644
index 7014a0b..0000000
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/Accountor.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-import io.netty.buffer.DrillBuf;
-
-public interface Accountor extends AutoCloseable{
-
-  public boolean transferTo(Accountor target, DrillBuf buf, long size);
-  public boolean transferIn(DrillBuf buf, long size);
-  public long getAvailable();
-  public long getCapacity();
-  public long getAllocation();
-  public long getPeakMemoryAllocation();
-
-  public boolean reserve(long size);
-  public boolean forceAdditionalReservation(long size);
-
-  public void reserved(long expected, DrillBuf buf);
-
-  public void release(DrillBuf buf, long size);
-  public void releasePartial(DrillBuf buf, long size);
-  public long resetFragmentLimits();
-  public void close();
-
-  public void setFragmentLimit(long add);
-  public long getFragmentLimit();
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java
new file mode 100644
index 0000000..65a1386
--- /dev/null
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Supports cumulative allocation reservation. Clients may increase the size of the reservation repeatedly until they
+ * call for an allocation of the current total size. The reservation can only be used once, and will throw an exception
+ * if it is used more than once.
+ * <p>
+ * For the purposes of airtight memory accounting, the reservation must be close()d whether it is used or not.
+ * This is not threadsafe.
+ */
+public interface AllocationReservation extends AutoCloseable {
+
+  /**
+   * Add to the current reservation.
+   *
+   * <p>Adding may fail if the allocator is not allowed to consume any more space.
+   *
+   * @param nBytes the number of bytes to add
+   * @return true if the addition is possible, false otherwise
+   * @throws IllegalStateException if called after buffer() is used to allocate the reservation
+   */
+  boolean add(final int nBytes);
+
+  /**
+   * Requests a reservation of additional space.
+   *
+   * <p>The implementation of the allocator's inner class provides this.
+   *
+   * @param nBytes the amount to reserve
+   * @return true if the reservation can be satisfied, false otherwise
+   */
+  boolean reserve(int nBytes);
+
+  /**
+   * Allocate a buffer whose size is the total of all the add()s made.
+   *
+   * <p>The allocation request can still fail, even if the amount of space
+   * requested is available, if the allocation cannot be made contiguously.
+   *
+   * @return the buffer, or null, if the request cannot be satisfied
+   * @throws IllegalStateException if called called more than once
+   */
+  DrillBuf allocateBuffer();
+
+  /**
+   * Get the current size of the reservation (the sum of all the add()s).
+   *
+   * @return size of the current reservation
+   */
+  int getSize();
+
+  /**
+   * Return whether or not the reservation has been used.
+   *
+   * @return whether or not the reservation has been used
+   */
+  public boolean isUsed();
+
+  /**
+   * Return whether or not the reservation has been closed.
+   *
+   * @return whether or not the reservation has been closed
+   */
+  public boolean isClosed();
+
+  public void close();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java
new file mode 100644
index 0000000..8bf2a99
--- /dev/null
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+/**
+ * Exception thrown when a closed BufferAllocator is used. Note
+ * this is an unchecked exception.
+ *
+ * @param message string associated with the cause
+ */
+@SuppressWarnings("serial")
+public class AllocatorClosedException extends RuntimeException {
+  public AllocatorClosedException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorManager.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorManager.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorManager.java
new file mode 100644
index 0000000..5142806
--- /dev/null
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorManager.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import static org.apache.drill.exec.memory.BaseAllocator.indent;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.PooledByteBufAllocatorL;
+import io.netty.buffer.UnsafeDirectLittleEndian;
+
+import java.util.IdentityHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.drill.common.HistoricalLog;
+import org.apache.drill.common.concurrent.AutoCloseableLock;
+import org.apache.drill.exec.memory.BaseAllocator.Verbosity;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.ops.BufferManager;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Manages the relationship between one or more allocators and a particular UDLE. Ensures that one allocator owns the
+ * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its associated allocators.
+ * This class is also responsible for managing when memory is allocated and returned to the Netty-based
+ * PooledByteBufAllocatorL.
+ *
+ * The only reason that this isn't package private is we're forced to put DrillBuf in Netty's package which need access
+ * to these objects or methods.
+ *
+ * Threading: AllocatorManager manages thread-safety internally. Operations within the context of a single BufferLedger
+ * are lockless in nature and can be leveraged by multiple threads. Operations that cross the context of two ledgers
+ * will acquire a lock on the AllocatorManager instance. Important note, there is one AllocatorManager per
+ * UnsafeDirectLittleEndian buffer allocation. As such, there will be thousands of these in a typical query. The
+ * contention of acquiring a lock on AllocatorManager should be very low.
+ *
+ */
+public class AllocatorManager {
+  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AllocatorManager.class);
+
+  private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0);
+  static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL(DrillMetrics.getInstance());
+
+  private final RootAllocator root;
+  private volatile BufferLedger owningLedger;
+  private final int size;
+  private final UnsafeDirectLittleEndian underlying;
+  private final IdentityHashMap<BufferAllocator, BufferLedger> map = new IdentityHashMap<>();
+  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+  private final AutoCloseableLock readLock = new AutoCloseableLock(lock.readLock());
+  private final AutoCloseableLock writeLock = new AutoCloseableLock(lock.writeLock());
+  private final IdentityHashMap<DrillBuf, Object> buffers =
+      BaseAllocator.DEBUG ? new IdentityHashMap<DrillBuf, Object>() : null;
+
+  AllocatorManager(BaseAllocator accountingAllocator, int size) {
+    Preconditions.checkNotNull(accountingAllocator);
+    this.root = accountingAllocator.root;
+    this.underlying = INNER_ALLOCATOR.allocate(size);
+    this.owningLedger = associate(accountingAllocator);
+    this.size = underlying.capacity();
+  }
+
+  /**
+   * Associate the existing underlying buffer with a new allocator.
+   *
+   * @param allocator
+   *          The target allocator to associate this buffer with.
+   * @return The Ledger (new or existing) that associates the underlying buffer to this new ledger.
+   */
+  public BufferLedger associate(final BaseAllocator allocator) {
+    if (root != allocator.root) {
+      throw new IllegalStateException(
+          "A buffer can only be associated between two allocators that share the same root.");
+    }
+
+    try (AutoCloseableLock read = readLock.open()) {
+
+      final BufferLedger ledger = map.get(allocator);
+      if (ledger != null) {
+        return ledger;
+      }
+
+    }
+    try (AutoCloseableLock write = writeLock.open()) {
+      final BufferLedger ledger = new BufferLedger(allocator, new ReleaseListener(allocator));
+      map.put(allocator, ledger);
+      allocator.associateLedger(ledger);
+      return ledger;
+    }
+  }
+
+
+  /**
+   * The way that a particular BufferLedger communicates back to the AllocatorManager that it now longer needs to hold a
+   * reference to particular piece of memory.
+   */
+  private class ReleaseListener {
+
+    private final BufferAllocator allocator;
+
+    public ReleaseListener(BufferAllocator allocator) {
+      this.allocator = allocator;
+    }
+
+    public void release() {
+      try (AutoCloseableLock write = writeLock.open()) {
+        final BufferLedger oldLedger = map.remove(allocator);
+        oldLedger.allocator.dissociateLedger(oldLedger);
+
+        if (oldLedger == owningLedger) {
+          if (map.isEmpty()) {
+            // no one else owns, lets release.
+            oldLedger.allocator.releaseBytes(size);
+            underlying.release();
+          } else {
+            // we need to change the owning allocator. we've been removed so we'll get whatever is top of list
+            BufferLedger newLedger = map.values().iterator().next();
+
+            // we'll forcefully transfer the ownership and not worry about whether we exceeded the limit
+            // since this consumer can do anything with this.
+            oldLedger.transferBalance(newLedger);
+          }
+        }
+
+
+      }
+    }
+  }
+
+  /**
+   * The reference manager that binds an allocator manager to a particular BaseAllocator. Also responsible for creating
+   * a set of DrillBufs that share a common fate and set of reference counts.
+   *
+   * As with AllocatorManager, the only reason this is public is due to DrillBuf being in io.netty.buffer package.
+   */
+  public class BufferLedger {
+    private final long id = LEDGER_ID_GENERATOR.incrementAndGet(); // unique ID assigned to each ledger
+    private final AtomicInteger bufRefCnt = new AtomicInteger(0); // start at zero so we can manage request for retain
+                                                                  // correctly
+    private final BaseAllocator allocator;
+    private final ReleaseListener listener;
+    private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH,
+        "BufferLedger[%d]", 1)
+        : null;
+
+    private BufferLedger(BaseAllocator allocator, ReleaseListener listener) {
+      this.allocator = allocator;
+      this.listener = listener;
+    }
+
+    /**
+     * Transfer any balance the current ledger has to the target ledger. In the case that the current ledger holds no
+     * memory, no transfer is made to the new ledger.
+     *
+     * @param target
+     *          The ledger to transfer ownership account to.
+     * @return Whether transfer fit within target ledgers limits.
+     */
+    public boolean transferBalance(BufferLedger target) {
+      Preconditions.checkNotNull(target);
+      Preconditions.checkArgument(allocator.root == target.allocator.root,
+          "You can only transfer between two allocators that share the same root.");
+
+      // since two balance transfers out from the allocator manager could cause incorrect accounting, we need to ensure
+      // that this won't happen by synchronizing on the allocator manager instance.
+      synchronized (AllocatorManager.this) {
+        if (this != owningLedger || target == this) {
+          return true;
+        }
+
+        if (BaseAllocator.DEBUG) {
+          this.historicalLog.recordEvent("transferBalance(%s)", target.allocator.name);
+          target.historicalLog.recordEvent("incoming(from %s)", owningLedger.allocator.name);
+        }
+
+        boolean overlimit = target.allocator.forceAllocate(size);
+        allocator.releaseBytes(size);
+        owningLedger = target;
+        return overlimit;
+      }
+
+    }
+
+    /**
+     * Print the current ledger state to a the provided StringBuilder.
+     *
+     * @param sb
+     *          The StringBuilder to populate.
+     * @param indent
+     *          The level of indentation to position the data.
+     * @param verbosity
+     *          The level of verbosity to print.
+     */
+    public void print(StringBuilder sb, int indent, Verbosity verbosity) {
+      indent(sb, indent)
+          .append("ledger (allocator: ")
+          .append(allocator.name)
+          .append("), isOwning: ")
+          .append(owningLedger == this)
+          .append(", size: ")
+          .append(size)
+          .append(", references: ")
+          .append(bufRefCnt.get())
+          .append('\n');
+
+      if (BaseAllocator.DEBUG) {
+        synchronized (buffers) {
+          indent(sb, indent + 1).append("BufferLedger[" + id + "] holds ").append(buffers.size())
+              .append(" buffers. \n");
+          for (DrillBuf buf : buffers.keySet()) {
+            buf.print(sb, indent + 2, verbosity);
+            sb.append('\n');
+          }
+        }
+      }
+
+    }
+
+    /**
+     * Release this ledger. This means that all reference counts associated with this ledger are no longer used. This
+     * will inform the AllocatorManager to make a decision about how to manage any memory owned by this particular
+     * BufferLedger
+     */
+    public void release() {
+      listener.release();
+    }
+
+    /**
+     * Returns the ledger associated with a particular BufferAllocator. If the BufferAllocator doesn't currently have a
+     * ledger associated with this AllocatorManager, a new one is created. This is placed on BufferLedger rather than
+     * AllocatorManager direclty because DrillBufs don't have access to AllocatorManager and they are the ones
+     * responsible for exposing the ability to associate mutliple allocators with a particular piece of underlying
+     * memory.
+     *
+     * @param allocator
+     * @return
+     */
+    public BufferLedger getLedgerForAllocator(BufferAllocator allocator) {
+      return associate((BaseAllocator) allocator);
+    }
+
+    /**
+     * Create a new DrillBuf associated with this AllocatorManager and memory. Does not impact reference count.
+     * Typically used for slicing.
+     * @param offset
+     *          The offset in bytes to start this new DrillBuf.
+     * @param length
+     *          The length in bytes that this DrillBuf will provide access to.
+     * @return A new DrillBuf that shares references with all DrillBufs associated with this BufferLedger
+     */
+    public DrillBuf newDrillBuf(int offset, int length) {
+      return newDrillBuf(offset, length, null, false);
+    }
+
+    /**
+     * Create a new DrillBuf associated with this AllocatorManager and memory.
+     * @param offset
+     *          The offset in bytes to start this new DrillBuf.
+     * @param length
+     *          The length in bytes that this DrillBuf will provide access to.
+     * @param manager
+     *          An optional BufferManager argument that can be used to manage expansion of this DrillBuf
+     * @param retain
+     *          Whether or not the newly created buffer should get an additional reference count added to it.
+     * @return A new DrillBuf that shares references with all DrillBufs associated with this BufferLedger
+     */
+    public DrillBuf newDrillBuf(int offset, int length, BufferManager manager, boolean retain) {
+      final DrillBuf buf = new DrillBuf(
+          bufRefCnt,
+          this,
+          underlying,
+          manager,
+          allocator.getAsByteBufAllocator(),
+          offset,
+          length,
+          false);
+
+      if (retain) {
+        buf.retain();
+      }
+
+      if (BaseAllocator.DEBUG) {
+        historicalLog.recordEvent(
+            "DrillBuf(BufferLedger, BufferAllocator[%s], UnsafeDirectLittleEndian[identityHashCode == "
+                + "%d](%s)) => ledger hc == %d",
+            allocator.name, System.identityHashCode(buf), buf.toString(),
+            System.identityHashCode(this));
+
+        synchronized (buffers) {
+          buffers.put(buf, null);
+        }
+      }
+
+      return buf;
+
+    }
+
+    /**
+     * What is the total size (in bytes) of memory underlying this ledger.
+     *
+     * @return Size in bytes
+     */
+    public int getSize() {
+      return size;
+    }
+
+    /**
+     * How much memory is accounted for by this ledger. This is either getSize() if this is the owning ledger for the
+     * memory or zero in the case that this is not the owning ledger associated with this memory.
+     *
+     * @return Amount of accounted(owned) memory associated with this ledger.
+     */
+    public int getAccountedSize() {
+      try (AutoCloseableLock read = readLock.open()) {
+        if (owningLedger == this) {
+          return size;
+        } else {
+          return 0;
+        }
+      }
+    }
+
+    /**
+     * Package visible for debugging/verification only.
+     */
+    UnsafeDirectLittleEndian getUnderlying() {
+      return underlying;
+    }
+
+    /**
+     * Package visible for debugging/verification only.
+     */
+    boolean isOwningLedger() {
+      return this == owningLedger;
+    }
+
+  }
+
+}
\ No newline at end of file