You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/12/22 16:06:32 UTC

[09/13] drill git commit: DRILL-4134: Allocator Improvements

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
new file mode 100644
index 0000000..78c3c73
--- /dev/null
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
@@ -0,0 +1,739 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.UnsafeDirectLittleEndian;
+
+import java.util.Arrays;
+import java.util.IdentityHashMap;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.drill.common.HistoricalLog;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.memory.AllocatorManager.BufferLedger;
+import org.apache.drill.exec.ops.BufferManager;
+import org.apache.drill.exec.util.AssertionUtil;
+
+import com.google.common.base.Preconditions;
+
+public abstract class BaseAllocator extends Accountant implements BufferAllocator {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseAllocator.class);
+
+  public static final String DEBUG_ALLOCATOR = "drill.memory.debug.allocator";
+
+  private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
+  private static final int CHUNK_SIZE = AllocatorManager.INNER_ALLOCATOR.getChunkSize();
+
+  public static final int DEBUG_LOG_LENGTH = 6;
+  public static final boolean DEBUG = AssertionUtil.isAssertionsEnabled()
+      || Boolean.parseBoolean(System.getProperty(DEBUG_ALLOCATOR, "false"));
+  private final Object DEBUG_LOCK = DEBUG ? new Object() : null;
+
+  private final BaseAllocator parentAllocator;
+  private final ByteBufAllocator thisAsByteBufAllocator;
+  private final IdentityHashMap<BaseAllocator, Object> childAllocators;
+  private final DrillBuf empty;
+
+  private volatile boolean isClosed = false; // the allocator has been closed
+
+  // Package exposed for sharing between AllocatorManger and BaseAllocator objects
+  final String name;
+  final RootAllocator root;
+
+  // members used purely for debugging
+  private final IdentityHashMap<BufferLedger, Object> childLedgers;
+  private final IdentityHashMap<Reservation, Object> reservations;
+  private final HistoricalLog historicalLog;
+
+  protected BaseAllocator(
+      final BaseAllocator parentAllocator,
+      final String name,
+      final long initReservation,
+      final long maxAllocation) throws OutOfMemoryException {
+    super(parentAllocator, initReservation, maxAllocation);
+
+    if (parentAllocator != null) {
+      this.root = parentAllocator.root;
+      empty = parentAllocator.empty;
+    } else if (this instanceof RootAllocator) {
+      this.root = (RootAllocator) this;
+      empty = createEmpty();
+    } else {
+      throw new IllegalStateException("An parent allocator must either carry a root or be the root.");
+    }
+
+    this.parentAllocator = parentAllocator;
+    this.name = name;
+
+    // TODO: DRILL-4131
+    // this.thisAsByteBufAllocator = new DrillByteBufAllocator(this);
+    this.thisAsByteBufAllocator = AllocatorManager.INNER_ALLOCATOR.allocator;
+
+    if (DEBUG) {
+      childAllocators = new IdentityHashMap<>();
+      reservations = new IdentityHashMap<>();
+      childLedgers = new IdentityHashMap<>();
+      historicalLog = new HistoricalLog(DEBUG_LOG_LENGTH, "allocator[%s]", name);
+      hist("created by \"%s\", owned = %d", name, this.getAllocatedMemory());
+    } else {
+      childAllocators = null;
+      reservations = null;
+      historicalLog = null;
+      childLedgers = null;
+    }
+
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public DrillBuf getEmpty() {
+    return empty;
+  }
+
+  /**
+   * For debug/verification purposes only. Allows an AllocatorManager to tell the allocator that we have a new ledger
+   * associated with this allocator.
+   */
+  void associateLedger(BufferLedger ledger) {
+    if (DEBUG) {
+      synchronized (DEBUG_LOCK) {
+        childLedgers.put(ledger, null);
+      }
+    }
+  }
+
+  /**
+   * For debug/verification purposes only. Allows an AllocatorManager to tell the allocator that we are removing a
+   * ledger associated with this allocator
+   */
+  void dissociateLedger(BufferLedger ledger) {
+    if (DEBUG) {
+      synchronized (DEBUG_LOCK) {
+        if (!childLedgers.containsKey(ledger)) {
+          throw new IllegalStateException("Trying to remove a child ledger that doesn't exist.");
+        }
+        childLedgers.remove(ledger);
+      }
+    }
+  }
+
+  /**
+   * Track when a ChildAllocator of this BaseAllocator is closed. Used for debugging purposes.
+   *
+   * @param childAllocator
+   *          The child allocator that has been closed.
+   */
+  private void childClosed(final BaseAllocator childAllocator) {
+    if (DEBUG) {
+      Preconditions.checkArgument(childAllocator != null, "child allocator can't be null");
+
+      synchronized (DEBUG_LOCK) {
+        final Object object = childAllocators.remove(childAllocator);
+        if (object == null) {
+          childAllocator.historicalLog.logHistory(logger);
+          throw new IllegalStateException("Child allocator[" + childAllocator.name
+              + "] not found in parent allocator[" + name + "]'s childAllocators");
+        }
+      }
+    }
+  }
+
+  private static String createErrorMsg(final BufferAllocator allocator, final int rounded, final int requested) {
+    if (rounded != requested) {
+      return String.format(
+          "Unable to allocate buffer of size %d (rounded from %d) due to memory limit. Current allocation: %d",
+          rounded, requested, allocator.getAllocatedMemory());
+    } else {
+      return String.format("Unable to allocate buffer of size %d due to memory limit. Current allocation: %d",
+          rounded, allocator.getAllocatedMemory());
+    }
+  }
+
+  @Override
+  public DrillBuf buffer(final int initialRequestSize) {
+    return buffer(initialRequestSize, null);
+  }
+
+  private DrillBuf createEmpty(){
+    return new DrillBuf(new AtomicInteger(), null, AllocatorManager.INNER_ALLOCATOR.empty, null, null, 0, 0, true);
+  }
+
+  @Override
+  public DrillBuf buffer(final int initialRequestSize, BufferManager manager) {
+
+    Preconditions.checkArgument(initialRequestSize >= 0, "the requested size must be non-negative");
+
+    if (initialRequestSize == 0) {
+      return empty;
+    }
+
+    // round to next largest power of two if we're within a chunk since that is how our allocator operates
+    final int actualRequestSize = initialRequestSize < CHUNK_SIZE ?
+        nextPowerOfTwo(initialRequestSize)
+        : initialRequestSize;
+    AllocationOutcome outcome = this.allocateBytes(actualRequestSize);
+    if (!outcome.isOk()) {
+      throw new OutOfMemoryException(createErrorMsg(this, actualRequestSize, initialRequestSize));
+    }
+
+    boolean success = false;
+    try {
+      DrillBuf buffer = bufferWithoutReservation(actualRequestSize, manager);
+      success = true;
+      return buffer;
+    } finally {
+      if (!success) {
+        releaseBytes(actualRequestSize);
+      }
+    }
+
+  }
+
+  /**
+   * Used by usual allocation as well as for allocating a pre-reserved buffer. Skips the typical accounting associated
+   * with creating a new buffer.
+   */
+  private DrillBuf bufferWithoutReservation(final int size, BufferManager bufferManager) throws OutOfMemoryException {
+    AllocatorManager manager = new AllocatorManager(this, size);
+    BufferLedger ledger = manager.associate(this);
+    DrillBuf buffer = ledger.newDrillBuf(0, size, bufferManager, true);
+
+    // make sure that our allocation is equal to what we expected.
+    Preconditions.checkArgument(buffer.capacity() == size,
+        "Allocated capacity %d was not equal to requested capacity %d.", buffer.capacity(), size);
+
+    return buffer;
+  }
+
+  @Override
+  public ByteBufAllocator getAsByteBufAllocator() {
+    return thisAsByteBufAllocator;
+  }
+
+  @Override
+  public BufferAllocator newChildAllocator(
+      final String name,
+      final long initReservation,
+      final long maxAllocation) {
+    final ChildAllocator childAllocator = new ChildAllocator(this, name, initReservation, maxAllocation);
+
+    if (DEBUG) {
+      synchronized (DEBUG_LOCK) {
+        childAllocators.put(childAllocator, childAllocator);
+        historicalLog.recordEvent("allocator[%s] created new child allocator[%s]", name, childAllocator.name);
+      }
+    }
+
+    return childAllocator;
+  }
+
+  public class Reservation implements AllocationReservation {
+    private int nBytes = 0;
+    private boolean used = false;
+    private boolean closed = false;
+    private final HistoricalLog historicalLog;
+
+    public Reservation() {
+      if (DEBUG) {
+        historicalLog = new HistoricalLog("Reservation[allocator[%s], %d]", name, System.identityHashCode(this));
+        historicalLog.recordEvent("created");
+        synchronized (DEBUG_LOCK) {
+          reservations.put(this, this);
+        }
+      } else {
+        historicalLog = null;
+      }
+    }
+
+    public boolean add(final int nBytes) {
+      Preconditions.checkArgument(nBytes >= 0, "nBytes(%d) < 0", nBytes);
+      Preconditions.checkState(!closed, "Attempt to increase reservation after reservation has been closed");
+      Preconditions.checkState(!used, "Attempt to increase reservation after reservation has been used");
+
+      // we round up to next power of two since all reservations are done in powers of two. This may overestimate the
+      // preallocation since someone may perceive additions to be power of two. If this becomes a problem, we can look
+      // at
+      // modifying this behavior so that we maintain what we reserve and what the user asked for and make sure to only
+      // round to power of two as necessary.
+      final int nBytesTwo = BaseAllocator.nextPowerOfTwo(nBytes);
+      if (!reserve(nBytesTwo)) {
+        return false;
+      }
+
+      this.nBytes += nBytesTwo;
+      return true;
+    }
+
+    public DrillBuf allocateBuffer() {
+      Preconditions.checkState(!closed, "Attempt to allocate after closed");
+      Preconditions.checkState(!used, "Attempt to allocate more than once");
+
+      final DrillBuf drillBuf = allocate(nBytes);
+      used = true;
+      return drillBuf;
+    }
+
+    public int getSize() {
+      return nBytes;
+    }
+
+    public boolean isUsed() {
+      return used;
+    }
+
+    public boolean isClosed() {
+      return closed;
+    }
+
+    @Override
+    public void close() {
+      if (closed) {
+        return;
+      }
+
+      if (DEBUG) {
+        if (!isClosed()) {
+          final Object object;
+          synchronized (DEBUG_LOCK) {
+            object = reservations.remove(this);
+          }
+          if (object == null) {
+            final StringBuilder sb = new StringBuilder();
+            print(sb, 0, Verbosity.LOG_WITH_STACKTRACE);
+            logger.debug(sb.toString());
+            throw new IllegalStateException(
+                String.format("Didn't find closing reservation[%d]", System.identityHashCode(this)));
+          }
+
+          historicalLog.recordEvent("closed");
+        }
+      }
+
+      if (!used) {
+        releaseReservation(nBytes);
+      }
+
+      closed = true;
+    }
+
+    public boolean reserve(int nBytes) {
+      final AllocationOutcome outcome = BaseAllocator.this.allocateBytes(nBytes);
+
+      if (DEBUG) {
+        historicalLog.recordEvent("reserve(%d) => %s", nBytes, Boolean.toString(outcome.isOk()));
+      }
+
+      return outcome.isOk();
+    }
+
+    /**
+     * Allocate the a buffer of the requested size.
+     *
+     * <p>
+     * The implementation of the allocator's inner class provides this.
+     *
+     * @param nBytes
+     *          the size of the buffer requested
+     * @return the buffer, or null, if the request cannot be satisfied
+     */
+    private DrillBuf allocate(int nBytes) {
+      boolean success = false;
+
+      /*
+       * The reservation already added the requested bytes to the allocators owned and allocated bytes via reserve().
+       * This ensures that they can't go away. But when we ask for the buffer here, that will add to the allocated bytes
+       * as well, so we need to return the same number back to avoid double-counting them.
+       */
+      try {
+        final DrillBuf drillBuf = BaseAllocator.this.bufferWithoutReservation(nBytes, null);
+
+        if (DEBUG) {
+          historicalLog.recordEvent("allocate() => %s", String.format("DrillBuf[%d]", drillBuf.getId()));
+        }
+        success = true;
+        return drillBuf;
+      } finally {
+        if (!success) {
+          releaseBytes(nBytes);
+        }
+      }
+    }
+
+    /**
+     * Return the reservation back to the allocator without having used it.
+     *
+     * @param nBytes
+     *          the size of the reservation
+     */
+    private void releaseReservation(int nBytes) {
+      releaseBytes(nBytes);
+
+      if (DEBUG) {
+        historicalLog.recordEvent("releaseReservation(%d)", nBytes);
+      }
+    }
+
+  }
+
+  @Override
+  public AllocationReservation newReservation() {
+    return new Reservation();
+  }
+
+
+  @Override
+  public synchronized void close() {
+    /*
+     * Some owners may close more than once because of complex cleanup and shutdown
+     * procedures.
+     */
+    if (isClosed) {
+      return;
+    }
+
+    if (DEBUG) {
+      synchronized(DEBUG_LOCK) {
+        verifyAllocator();
+
+        // are there outstanding child allocators?
+        if (!childAllocators.isEmpty()) {
+          for (final BaseAllocator childAllocator : childAllocators.keySet()) {
+            if (childAllocator.isClosed) {
+              logger.warn(String.format(
+                  "Closed child allocator[%s] on parent allocator[%s]'s child list.\n%s",
+                  childAllocator.name, name, toString()));
+            }
+          }
+
+          throw new IllegalStateException(
+              String.format("Allocator[%s] closed with outstanding child allocators.\n%s", name, toString()));
+        }
+
+        // are there outstanding buffers?
+        final int allocatedCount = childLedgers.size();
+        if (allocatedCount > 0) {
+          throw new IllegalStateException(
+              String.format("Allocator[%s] closed with outstanding buffers allocated (%d).\n%s",
+                  name, allocatedCount, toString()));
+        }
+
+        if (reservations.size() != 0) {
+          throw new IllegalStateException(
+              String.format("Allocator[%s] closed with outstanding reservations (%d).\n%s", name, reservations.size(),
+                  toString()));
+        }
+
+      }
+    }
+
+      // Is there unaccounted-for outstanding allocation?
+      final long allocated = getAllocatedMemory();
+      if (allocated > 0) {
+        throw new IllegalStateException(
+          String.format("Unaccounted for outstanding allocation (%d)\n%s", allocated, toString()));
+      }
+
+    // we need to release our memory to our parent before we tell it we've closed.
+    super.close();
+
+    // Inform our parent allocator that we've closed
+    if (parentAllocator != null) {
+      parentAllocator.childClosed(this);
+    }
+
+    if (DEBUG) {
+      historicalLog.recordEvent("closed");
+      logger.debug(String.format(
+          "closed allocator[%s].",
+          name));
+    }
+
+    isClosed = true;
+
+
+  }
+
+  public String toString() {
+    final Verbosity verbosity = logger.isTraceEnabled() ? Verbosity.LOG_WITH_STACKTRACE
+        : Verbosity.BASIC;
+    final StringBuilder sb = new StringBuilder();
+    print(sb, 0, verbosity);
+    return sb.toString();
+  }
+
+  /**
+   * Provide a verbose string of the current allocator state. Includes the state of all child allocators, along with
+   * historical logs of each object and including stacktraces.
+   *
+   * @return A Verbose string of current allocator state.
+   */
+  public String toVerboseString() {
+    final StringBuilder sb = new StringBuilder();
+    print(sb, 0, Verbosity.LOG_WITH_STACKTRACE);
+    return sb.toString();
+  }
+
+  private void hist(String noteFormat, Object... args) {
+    historicalLog.recordEvent(noteFormat, args);
+  }
+
+  /**
+   * Rounds up the provided value to the nearest power of two.
+   *
+   * @param val
+   *          An integer value.
+   * @return The closest power of two of that value.
+   */
+  static int nextPowerOfTwo(int val) {
+    int highestBit = Integer.highestOneBit(val);
+    if (highestBit == val) {
+      return val;
+    } else {
+      return highestBit << 1;
+    }
+  }
+
+
+  /**
+   * Verifies the accounting state of the allocator. Only works for DEBUG.
+   *
+   * @throws IllegalStateException
+   *           when any problems are found
+   */
+  void verifyAllocator() {
+    final IdentityHashMap<UnsafeDirectLittleEndian, BaseAllocator> buffersSeen = new IdentityHashMap<>();
+    verifyAllocator(buffersSeen);
+  }
+
+  /**
+   * Verifies the accounting state of the allocator. Only works for DEBUG.
+   *
+   * <p>
+   * This overload is used for recursive calls, allowing for checking that DrillBufs are unique across all allocators
+   * that are checked.
+   * </p>
+   *
+   * @param buffersSeen
+   *          a map of buffers that have already been seen when walking a tree of allocators
+   * @throws IllegalStateException
+   *           when any problems are found
+   */
+  private void verifyAllocator(final IdentityHashMap<UnsafeDirectLittleEndian, BaseAllocator> buffersSeen) {
+    synchronized (DEBUG_LOCK) {
+
+      // The remaining tests can only be performed if we're in debug mode.
+      if (!DEBUG) {
+        return;
+      }
+
+      final long allocated = getAllocatedMemory();
+
+      // verify my direct descendants
+      final Set<BaseAllocator> childSet = childAllocators.keySet();
+      for (final BaseAllocator childAllocator : childSet) {
+        childAllocator.verifyAllocator(buffersSeen);
+      }
+
+      /*
+       * Verify my relationships with my descendants.
+       *
+       * The sum of direct child allocators' owned memory must be <= my allocated memory; my allocated memory also
+       * includes DrillBuf's directly allocated by me.
+       */
+      long childTotal = 0;
+      for (final BaseAllocator childAllocator : childSet) {
+        childTotal += Math.max(childAllocator.getAllocatedMemory(), childAllocator.reservation);
+      }
+      if (childTotal > getAllocatedMemory()) {
+        historicalLog.logHistory(logger);
+        logger.debug("allocator[" + name + "] child event logs BEGIN");
+        for (final BaseAllocator childAllocator : childSet) {
+          childAllocator.historicalLog.logHistory(logger);
+        }
+        logger.debug("allocator[" + name + "] child event logs END");
+        throw new IllegalStateException(
+            "Child allocators own more memory (" + childTotal + ") than their parent (name = "
+                + name + " ) has allocated (" + getAllocatedMemory() + ')');
+      }
+
+      // Furthermore, the amount I've allocated should be that plus buffers I've allocated.
+      long bufferTotal = 0;
+
+      final Set<BufferLedger> ledgerSet = childLedgers.keySet();
+      for (final BufferLedger ledger : ledgerSet) {
+        if (!ledger.isOwningLedger()) {
+          continue;
+        }
+
+        final UnsafeDirectLittleEndian udle = ledger.getUnderlying();
+        /*
+         * Even when shared, DrillBufs are rewrapped, so we should never see the same instance twice.
+         */
+        final BaseAllocator otherOwner = buffersSeen.get(udle);
+        if (otherOwner != null) {
+          throw new IllegalStateException("This allocator's drillBuf already owned by another allocator");
+        }
+        buffersSeen.put(udle, this);
+
+        bufferTotal += udle.maxCapacity();
+      }
+
+      // Preallocated space has to be accounted for
+      final Set<Reservation> reservationSet = reservations.keySet();
+      long reservedTotal = 0;
+      for (final Reservation reservation : reservationSet) {
+        if (!reservation.isUsed()) {
+          reservedTotal += reservation.getSize();
+        }
+      }
+
+      if (bufferTotal + reservedTotal + childTotal != getAllocatedMemory()) {
+        final StringBuilder sb = new StringBuilder();
+        sb.append("allocator[");
+        sb.append(name);
+        sb.append("]\nallocated: ");
+        sb.append(Long.toString(allocated));
+        sb.append(" allocated - (bufferTotal + reservedTotal + childTotal): ");
+        sb.append(Long.toString(allocated - (bufferTotal + reservedTotal + childTotal)));
+        sb.append('\n');
+
+        if (bufferTotal != 0) {
+          sb.append("buffer total: ");
+          sb.append(Long.toString(bufferTotal));
+          sb.append('\n');
+          dumpBuffers(sb, ledgerSet);
+        }
+
+        if (childTotal != 0) {
+          sb.append("child total: ");
+          sb.append(Long.toString(childTotal));
+          sb.append('\n');
+
+          for (final BaseAllocator childAllocator : childSet) {
+            sb.append("child allocator[");
+            sb.append(childAllocator.name);
+            sb.append("] owned ");
+            sb.append(Long.toString(childAllocator.getAllocatedMemory()));
+            sb.append('\n');
+          }
+        }
+
+        if (reservedTotal != 0) {
+          sb.append(String.format("reserved total : %d bytes.", reservedTotal));
+          for (final Reservation reservation : reservationSet) {
+            reservation.historicalLog.buildHistory(sb, 0, true);
+            sb.append('\n');
+          }
+        }
+
+        logger.debug(sb.toString());
+        throw new IllegalStateException(String.format(
+            "allocator[%s]: buffer space (%d) + prealloc space (%d) + child space (%d) != allocated (%d)",
+            name, bufferTotal, reservedTotal, childTotal, allocated));
+      }
+    }
+  }
+
+  void print(StringBuilder sb, int level, Verbosity verbosity) {
+
+    indent(sb, level)
+        .append("Allocator(")
+        .append(name)
+        .append(") ")
+        .append(reservation)
+        .append('/')
+        .append(getAllocatedMemory())
+        .append('/')
+        .append(getPeakMemoryAllocation())
+        .append('/')
+        .append(getLimit())
+        .append(" (res/actual/peak/limit)")
+        .append('\n');
+
+    if (DEBUG) {
+      indent(sb, level + 1).append(String.format("child allocators: %d\n", childAllocators.size()));
+      for (BaseAllocator child : childAllocators.keySet()) {
+        child.print(sb, level + 2, verbosity);
+      }
+
+      indent(sb, level + 1).append(String.format("ledgers: %d\n", childLedgers.size()));
+      for (BufferLedger ledger : childLedgers.keySet()) {
+        ledger.print(sb, level + 2, verbosity);
+      }
+
+      final Set<Reservation> reservations = this.reservations.keySet();
+      indent(sb, level + 1).append(String.format("reservations: %d\n", reservations.size()));
+      for (final Reservation reservation : reservations) {
+        if (verbosity.includeHistoricalLog) {
+          reservation.historicalLog.buildHistory(sb, level + 3, true);
+        }
+      }
+
+    }
+
+  }
+
+  private void dumpBuffers(final StringBuilder sb, final Set<BufferLedger> ledgerSet) {
+    for (final BufferLedger ledger : ledgerSet) {
+      if (!ledger.isOwningLedger()) {
+        continue;
+      }
+      final UnsafeDirectLittleEndian udle = ledger.getUnderlying();
+      sb.append("UnsafeDirectLittleEndian[dentityHashCode == ");
+      sb.append(Integer.toString(System.identityHashCode(udle)));
+      sb.append("] size ");
+      sb.append(Integer.toString(udle.maxCapacity()));
+      sb.append('\n');
+    }
+  }
+
+
+  public static StringBuilder indent(StringBuilder sb, int indent) {
+    final char[] indentation = new char[indent * 2];
+    Arrays.fill(indentation, ' ');
+    sb.append(indentation);
+    return sb;
+  }
+
+  public static enum Verbosity {
+    BASIC(false, false), // only include basic information
+    LOG(true, false), // include basic
+    LOG_WITH_STACKTRACE(true, true) //
+    ;
+
+    public final boolean includeHistoricalLog;
+    public final boolean includeStackTraces;
+
+    Verbosity(boolean includeHistoricalLog, boolean includeStackTraces) {
+      this.includeHistoricalLog = includeHistoricalLog;
+      this.includeStackTraces = includeStackTraces;
+    }
+  }
+
+  public static boolean isDebug() {
+    return DEBUG;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BoundsChecking.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BoundsChecking.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BoundsChecking.java
index a15e348..bc61182 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BoundsChecking.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BoundsChecking.java
@@ -31,4 +31,5 @@ public class BoundsChecking {
 
   private BoundsChecking() {
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
index 7d14b94..0226254 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
@@ -20,8 +20,8 @@ package org.apache.drill.exec.memory;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.DrillBuf;
 
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.BufferManager;
 
 /**
  * Wrapper class to deal with byte buffer allocation. Ensures users only use designated methods.
@@ -31,156 +31,119 @@ public interface BufferAllocator extends AutoCloseable {
    * Allocate a new or reused buffer of the provided size. Note that the buffer may technically be larger than the
    * requested size for rounding purposes. However, the buffer's capacity will be set to the configured size.
    *
-   * @param size The size in bytes.
+   * @param size
+   *          The size in bytes.
    * @return a new DrillBuf, or null if the request can't be satisfied
-   * @throws OutOfMemoryRuntimeException if buffer cannot be allocated
+   * @throws OutOfMemoryException
+   *           if buffer cannot be allocated
    */
   public DrillBuf buffer(int size);
 
   /**
-   * Allocate a new or reused buffer within provided range. Note that the buffer may technically be larger than the
+   * Allocate a new or reused buffer of the provided size. Note that the buffer may technically be larger than the
    * requested size for rounding purposes. However, the buffer's capacity will be set to the configured size.
    *
-   * @param minSize The minimum size in bytes.
-   * @param maxSize The maximum size in bytes.
+   * @param size
+   *          The size in bytes.
+   * @param manager
+   *          A buffer manager to manage reallocation.
    * @return a new DrillBuf, or null if the request can't be satisfied
-   * @throws OutOfMemoryRuntimeException if buffer cannot be allocated
+   * @throws OutOfMemoryException
+   *           if buffer cannot be allocated
    */
-  public DrillBuf buffer(int minSize, int maxSize);
+  public DrillBuf buffer(int size, BufferManager manager);
 
   /**
    * Returns the allocator this allocator falls back to when it needs more memory.
    *
    * @return the underlying allocator used by this allocator
    */
-  public ByteBufAllocator getUnderlyingAllocator();
-
-  /**
-   * Create a child allocator nested below this one.
-   *
-   * @param context - the owner or this allocator
-   * @param initialReservation - specified in bytes
-   * @param maximumReservation - specified in bytes
-   * @param applyFragmentLimit - flag to conditionally enable fragment memory limits
-   * @return - a new buffer allocator owned by the parent it was spawned from
-   */
-  @Deprecated
-  public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation,
-      long maximumReservation, boolean applyFragmentLimit);
-
-  /**
-   * Flag: this allocator is a limiting sub-tree root, meaning that the maxAllocation for
-   * it applies to all its descendant child allocators. In low memory situations, the limits
-   * for sub-tree roots may be adjusted down so that they evenly share the total amount of
-   * direct memory across all the sub-tree roots.
-   */
-  public final static int F_LIMITING_ROOT = 0x0001;
+  public ByteBufAllocator getAsByteBufAllocator();
 
   /**
    * Create a new child allocator.
    *
-   * @param allocatorOwner the allocator owner
-   * @param initReservation the initial space reservation (obtained from this allocator)
-   * @param maxAllocation maximum amount of space the new allocator can allocate
-   * @param flags one or more of BufferAllocator.F_* flags
+   * @param name
+   *          the name of the allocator.
+   * @param initReservation
+   *          the initial space reservation (obtained from this allocator)
+   * @param maxAllocation
+   *          maximum amount of space the new allocator can allocate
    * @return the new allocator, or null if it can't be created
    */
-  public BufferAllocator newChildAllocator(AllocatorOwner allocatorOwner,
-      long initReservation, long maxAllocation, int flags);
+  public BufferAllocator newChildAllocator(String name, long initReservation, long maxAllocation);
 
   /**
-   * Take over ownership of the given buffer, adjusting accounting accordingly.
-   * This allocator always takes over ownership.
+   * Close and release all buffers generated from this buffer pool.
    *
-   * @param buf the buffer to take over
-   * @return false if over allocation
+   * <p>When assertions are on, complains if there are any outstanding buffers; to avoid
+   * that, release all buffers before the allocator is closed.
    */
-  public boolean takeOwnership(DrillBuf buf);
+  @Override
+  public void close();
 
   /**
-   * Share ownership of a buffer between allocators.
+   * Returns the amount of memory currently allocated from this allocator.
    *
-   * @param buf the buffer
-   * @param bufOut a new DrillBuf owned by this allocator, but sharing the same underlying buffer
-   * @return false if over allocation.
+   * @return the amount of memory currently allocated
    */
-  public boolean shareOwnership(DrillBuf buf, Pointer<DrillBuf> bufOut);
+  public long getAllocatedMemory();
 
   /**
-   * Not thread safe.
-   *
-   * WARNING: unclaimed pre-allocations leak memory. If you call preAllocate(), you must
-   * make sure to ultimately try to get the buffer and release it.
+   * Set the maximum amount of memory this allocator is allowed to allocate.
    *
-   * For Child allocators to set their Fragment limits.
-   *
-   * @param fragmentLimit the new fragment limit
+   * @param newLimit
+   *          The new Limit to apply to allocations
    */
-  @Deprecated // happens automatically, and via allocation policies
-  public void setFragmentLimit(long fragmentLimit);
+  public void setLimit(long newLimit);
 
   /**
-   * Returns the current fragment limit.
-   *
-   * @return the current fragment limit
-   */
-  /*
-   * TODO should be replaced with something more general because of
-   * the availability of multiple allocation policies
+   * Return the current maximum limit this allocator imposes.
    *
-   * TODO We should also have a getRemainingMemory() so operators
-   * can query how much more is left to allocate. That could be
-   * tricky.
+   * @return Limit in number of bytes.
    */
-  @Deprecated
-  public long getFragmentLimit();
+  public long getLimit();
 
   /**
-   * Return a unique Id for an allocator. Id's may be recycled after
-   * a long period of time.
-   *
-   * <p>Primary use for this is for debugging output.</p>
+   * Returns the peak amount of memory allocated from this allocator.
    *
-   * @return the allocator's id
+   * @return the peak amount of memory allocated
    */
-  public int getId();
+  public long getPeakMemoryAllocation();
 
   /**
-   * Close and release all buffers generated from this buffer pool.
+   * Create an allocation reservation. A reservation is a way of building up
+   * a request for a buffer whose size is not known in advance. See
+   * {@see AllocationReservation}.
    *
-   * <p>When assertions are on, complains if there are any outstanding buffers; to avoid
-   * that, release all buffers before the allocator is closed.
+   * @return the newly created reservation
    */
-  @Override
-  public void close() throws Exception;
+  public AllocationReservation newReservation();
 
   /**
-   * Returns the amount of memory currently allocated from this allocator.
-   *
-   * @return the amount of memory currently allocated
+   * Get a reference to the empty buffer associated with this allocator. Empty buffers are special because we don't
+   * worry about them leaking or managing reference counts on them since they don't actually point to any memory.
    */
-  public long getAllocatedMemory();
+  public DrillBuf getEmpty();
 
   /**
-   * Returns the peak amount of memory allocated from this allocator.
-   *
-   * @return the peak amount of memory allocated
+   * Return the name of this allocator. This is a human readable name that can help debugging. Typically provides
+   * coordinates about where this allocator was created
    */
-  public long getPeakMemoryAllocation();
+  public String getName();
 
   /**
-   * Returns an empty DrillBuf.
-   *
-   * @return an empty DrillBuf
+   * Return whether or not this allocator (or one if its parents) is over its limits. In the case that an allocator is
+   * over its limit, all consumers of that allocator should aggressively try to addrss the overlimit situation.
    */
-  public DrillBuf getEmpty();
+  public boolean isOverLimit();
 
   /**
-   * Create an allocation reservation. A reservation is a way of building up
-   * a request for a buffer whose size is not known in advance. See
-   * {@see AllocationReservation}.
+   * Return a verbose string describing this allocator. If in DEBUG mode, this will also include relevant stacktraces
+   * and historical logs for underlying objects
    *
-   * @return the newly created reservation
+   * @return A very verbose description of the allocator hierarchy.
    */
-  public AllocationReservation newReservation();
+  public String toVerboseString();
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/org/apache/drill/exec/memory/ChildAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/ChildAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/ChildAllocator.java
new file mode 100644
index 0000000..8fcabb1
--- /dev/null
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/ChildAllocator.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+
+/**
+ * Child allocator class. Only slightly different from the {@see RootAllocator},
+ * in that these can't be created directly, but must be obtained from
+ * {@see BufferAllocator#newChildAllocator(AllocatorOwner, long, long, int)}.
+
+ * <p>Child allocators can only be created by the root, or other children, so
+ * this class is package private.</p>
+ */
+class ChildAllocator extends BaseAllocator {
+  /**
+   * Constructor.
+   *
+   * @param parentAllocator parent allocator -- the one creating this child
+   * @param allocatorOwner a handle to the object making the request
+   * @param allocationPolicy the allocation policy to use; the policy for all
+   *   allocators must match for each invocation of a drillbit
+   * @param initReservation initial amount of space to reserve (obtained from the parent)
+   * @param maxAllocation maximum amount of space that can be obtained from this allocator;
+   *   note this includes direct allocations (via {@see BufferAllocator#buffer(int, int)}
+   *   et al) and requests from descendant allocators. Depending on the allocation policy in
+   *   force, even less memory may be available
+   * @param flags one or more of BaseAllocator.F_* flags
+   */
+  ChildAllocator(
+      BaseAllocator parentAllocator,
+      String name,
+      long initReservation,
+      long maxAllocation) {
+    super(parentAllocator, name, initReservation, maxAllocation);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/org/apache/drill/exec/memory/DrillByteBufAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/DrillByteBufAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/DrillByteBufAllocator.java
new file mode 100644
index 0000000..ec423e2
--- /dev/null
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/DrillByteBufAllocator.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.ExpandableByteBuf;
+
+/**
+ * An implementation of ByteBufAllocator that wraps a Drill BufferAllocator. This allows the RPC layer to be accounted
+ * and managed using Drill's BufferAllocator infrastructure. The only thin different from a typical BufferAllocator is
+ * the signature and the fact that this Allocator returns ExpandableByteBufs which enable otherwise non-expandable
+ * DrillBufs to be expandable.
+ */
+public class DrillByteBufAllocator implements ByteBufAllocator {
+
+  private static final int DEFAULT_BUFFER_SIZE = 4096;
+  private static final int DEFAULT_MAX_COMPOSITE_COMPONENTS = 16;
+
+  private final BufferAllocator allocator;
+
+  public DrillByteBufAllocator(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  @Override
+  public ByteBuf buffer() {
+    return buffer(DEFAULT_BUFFER_SIZE);
+  }
+
+  @Override
+  public ByteBuf buffer(int initialCapacity) {
+    return new ExpandableByteBuf(allocator.buffer(initialCapacity), allocator);
+  }
+
+  @Override
+  public ByteBuf buffer(int initialCapacity, int maxCapacity) {
+    return buffer(initialCapacity);
+  }
+
+  @Override
+  public ByteBuf ioBuffer() {
+    return buffer();
+  }
+
+  @Override
+  public ByteBuf ioBuffer(int initialCapacity) {
+    return buffer(initialCapacity);
+  }
+
+  @Override
+  public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
+    return buffer(initialCapacity);
+  }
+
+  @Override
+  public ByteBuf directBuffer() {
+    return buffer();
+  }
+
+  @Override
+  public ByteBuf directBuffer(int initialCapacity) {
+    return allocator.buffer(initialCapacity);
+  }
+
+  @Override
+  public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
+    return buffer(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public CompositeByteBuf compositeBuffer() {
+    return compositeBuffer(DEFAULT_MAX_COMPOSITE_COMPONENTS);
+  }
+
+  @Override
+  public CompositeByteBuf compositeBuffer(int maxNumComponents) {
+    return new CompositeByteBuf(this, true, maxNumComponents);
+  }
+
+  @Override
+  public CompositeByteBuf compositeDirectBuffer() {
+    return compositeBuffer();
+  }
+
+  @Override
+  public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
+    return compositeBuffer(maxNumComponents);
+  }
+
+  @Override
+  public boolean isDirectBufferPooled() {
+    return false;
+  }
+
+  @Override
+  public ByteBuf heapBuffer() {
+    throw fail();
+  }
+
+  @Override
+  public ByteBuf heapBuffer(int initialCapacity) {
+    throw fail();
+  }
+
+  @Override
+  public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
+    throw fail();
+  }
+
+  @Override
+  public CompositeByteBuf compositeHeapBuffer() {
+    throw fail();
+  }
+
+  @Override
+  public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
+    throw fail();
+  }
+
+  private RuntimeException fail() {
+    throw new UnsupportedOperationException("Allocator doesn't support heap-based memory.");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/org/apache/drill/exec/memory/LimitConsumer.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/LimitConsumer.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/LimitConsumer.java
deleted file mode 100644
index 777d9d2..0000000
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/LimitConsumer.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-public interface LimitConsumer {
-
-  public String getIdentifier();
-  public long getAllocated();
-
-  public long getLimit();
-
-  public void setLimit(long limit);
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/org/apache/drill/exec/memory/README.md
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/README.md b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/README.md
new file mode 100644
index 0000000..cbb8d96
--- /dev/null
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/README.md
@@ -0,0 +1,121 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+ 
+ http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+# Memory: Allocation, Accounting and Management
+ 
+The memory management package contains all the memory allocation related items that Drill uses to manage memory.
+
+
+## Key Components
+Memory management can be broken into the following main components:
+
+- Memory chunk allocation and fragmentation management
+  - `PooledByteBufAllocatorL` - A LittleEndian clone of Netty's jemalloc implementation
+  - `UnsafeDirectLittleEndian` - A base level memory access interface
+  - `LargeBuffer` - A buffer backing implementation used when working with data larger than one Netty chunk (default to 16mb)
+- Memory limits & Accounting
+  - `Accountant` - A nestable class of lockfree memory accountors.
+- Application-level memory allocation
+  - `BufferAllocator` - The public interface application users should be leveraging
+  - `BaseAllocator` - The base implementation of memory allocation, contains the meat of our the Drill allocator implementation
+  - `RootAllocator` - The root allocator. Typically only one created for a JVM
+  - `ChildAllocator` - A child allocator that derives from the root allocator
+- Buffer ownership and transfer capabilities
+  - `AllocatorManager` - Responsible for managing the relationship between multiple allocators and a single chunk of memory
+  - `BufferLedger` - Responsible for allowing maintaining the relationship between an `AllocatorManager`, a `BufferAllocator` and one or more individual `DrillBuf`s 
+- Memory access
+  - `DrillBuf` - The facade for interacting directly with a chunk of memory.
+ 
+
+## Memory Management Overview
+Drill's memory model is based on the following basic concepts:
+
+ - Memory can be allocated up to some limit. That limit could be a real limit (OS/JVM) or a locally imposed limit.
+ - Allocation operates in two phases: accounting then actual allocation. Allocation could fail at either point.
+ - Allocation failure should be recoverable. In all cases, the Allocator infrastructure should expose memory allocation failures (OS or internal limit-based) as `OutOfMemoryException`s.
+ - Any allocator can reserve memory when created. This memory shall be held such that this allocator will always be able to allocate that amount of memory.
+ - A particular application component should work to use a local allocator to understand local memory usage and better debug memory leaks.
+ - The same physical memory can be shared by multiple allocators and the allocator must provide an accounting paradigm for this purpose.
+
+## Allocator Trees
+
+Drill provides a tree-based model for memory allocation. The RootAllocator is created first, then all allocators are created as children of that allocator. The RootAllocator is responsible for being the master bookeeper for memory allocations. All other allocators are created as children of this tree. Each allocator can first determine whether it has enough local memory to satisfy a particular request. If not, the allocator can ask its parent for an additional memory allocation.
+
+## Reserving Memory
+
+Drill provides two different ways to reserve memory:
+
+  - BufferAllocator accounting reservations: 
+      When a new allocator (other than the `RootAllocator`) is initialized, it can set aside memory that it will keep locally for its lifetime. This is memory that will never be released back to its parent allocator until the allocator is closed.
+  - `AllocationReservation` via BufferAllocator.newReservation(): Allows a short-term preallocation strategy so that a particular subsystem can ensure future memory is available to support a particular request.
+  
+## Memory Ownership, Reference Counts and Sharing
+Many BufferAllocators can reference the same piece of memory at the same time. The most common situation for this is in the case of a Broadcast Join: in this situation many downstream operators in the same Drillbit will receive the same physical memory. Each of these operators will be operating within its own Allocator context. We therefore have multiple allocators all pointing at the same physical memory. It is the AllocatorManager's responsibility to ensure that in this situation, that all memory is accurately accounted for from the Root's perspective and also to ensure that the memory is correctly released once all BufferAllocators have stopped using that memory.
+
+For simplicity of accounting, we treat that memory as being used by one of the BufferAllocators associated with the memory. When that allocator releases its claim on that memory, the memory ownership is then moved to another BufferLedger belonging to the same AllocatorManager. Note that because a DrillBuf.release() is what actually causes memory ownership transfer to occur, we always precede with ownership transfer (even if that violates an allocator limit). It is the responsibility of the application owning a particular allocator to frequently confirm whether the allocator is over its memory limit (BufferAllocator.isOverLimit()) and if so, attempt to aggresively release memory to ameliorate the situation.
+
+All DrillBufs (direct or sliced) related to a single BufferLedger/BufferAllocator combination share the same reference count and either all will be valid or all will be invalid.
+
+## Object Hierarchy
+
+There are two main ways that someone can look at the object hierarchy for Drill's memory management scheme. The first is a memory based perspective as below:
+
+### Memory Perspective
+<pre>
++ AllocatorManager
+|
+|-- UnsignedDirectLittleEndian (One per AllocatorManager)
+|
+|-+ BufferLedger 1 ==> Allocator A (owning)
+| ` - DrillBuf 1
+|-+ BufferLedger 2 ==> Allocator B (non-owning)
+| ` - DrillBuf 2
+|-+ BufferLedger 3 ==> Allocator C (non-owning)
+  | - DrillBuf 3
+  | - DrillBuf 4
+  ` - DrillBuf 5
+</pre>
+
+In this picture, a piece of memory is owned by an allocator manager. An allocator manager is responsible for that piece of memory no matter which allocator(s) it is working with. An allocator manager will have relationships with a piece of raw memory (via its reference to UnsignedDirectLittleEndian) as well as references to each BufferAllocator it has a relationship to. 
+
+### Allocator Perspective
+<pre>
++ RootAllocator
+|-+ ChildAllocator 1
+| | - ChildAllocator 1.1
+| ` ...
+|
+|-+ ChildAllocator 2
+|-+ ChildAllocator 3
+| |
+| |-+ BufferLedger 1 ==> AllocatorManager 1 (owning) ==> UDLE
+| | `- DrillBuf 1
+| `-+ BufferLedger 2 ==> AllocatorManager 2 (non-owning)==> UDLE
+| 	`- DrillBuf 2
+|
+|-+ BufferLedger 3 ==> AllocatorManager 1 (non-owning)==> UDLE
+| ` - DrillBuf 3
+|-+ BufferLedger 4 ==> AllocatorManager 2 (owning) ==> UDLE
+  | - DrillBuf 4
+  | - DrillBuf 5
+  ` - DrillBuf 6
+</pre>
+
+In this picture, a RootAllocator owns three ChildAllocators. The first ChildAllocator (ChildAllocator 1) owns a subsequent ChildAllocator. ChildAllocator has two BufferLedgers/AllocatorManager references. Coincidentally, each of these AllocatorManager's is also associated with the RootAllocator. In this case, one of the these AllocatorManagers is owned by ChildAllocator 3 (AllocatorManager 1) while the other AllocatorManager (AllocatorManager 2) is owned/accounted for by the RootAllocator. Note that in this scenario, DrillBuf 1 is sharing the underlying memory as DrillBuf 3. However the subset of that memory (e.g. through slicing) might be different. Also note that DrillBuf 2 and DrillBuf 4, 5 and 6 are also sharing the same underlying memory. Also note that DrillBuf 4, 5 and 6 all share the same reference count and fate.
+
+## Debugging Issues
+The Allocator object provides a useful set of tools to better understand the status of the allocator. If in `DEBUG` mode, the allocator and supporting classes will record additional debug tracking information to better track down memory leaks and issues. To enable DEBUG mode, either enable Java assertions with `-ea` or pass the following system property to the VM when starting `-Ddrill.memory.debug.allocator=true`. The BufferAllocator also provides a `BufferAllocator.toVerboseString()` which can be used in DEBUG mode to get extensive stacktrace information and events associated with various Allocator behaviors.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/org/apache/drill/exec/memory/RootAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/RootAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/RootAllocator.java
new file mode 100644
index 0000000..5ab4130
--- /dev/null
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/RootAllocator.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The root allocator for using direct memory inside a Drillbit. Supports creating a
+ * tree of descendant child allocators.
+ */
+public class RootAllocator extends BaseAllocator {
+
+  public RootAllocator(final long limit) {
+    super(null, "ROOT", 0, limit);
+  }
+
+  /**
+   * Verify the accounting state of the allocation system.
+   */
+  @VisibleForTesting
+  public void verify() {
+    verifyAllocator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/org/apache/drill/exec/memory/package-info.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/package-info.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/package-info.java
new file mode 100644
index 0000000..3c1b9e5
--- /dev/null
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/package-info.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *  Memory Allocation, Account and Management
+ *
+ *  See the README.md file in this directory for detailed information about Drill's memory allocation subsystem.
+ *
+ */
+package org.apache.drill.exec.memory;

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/resources/drill-module.conf b/exec/memory/base/src/main/resources/drill-module.conf
new file mode 100644
index 0000000..593ef8e
--- /dev/null
+++ b/exec/memory/base/src/main/resources/drill-module.conf
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//  This file tells Drill to consider this module when class path scanning.
+//  This file can also include any supplementary configuration information.
+//  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill: {
+  memory: {
+    debug.error_on_leak: true,
+    top.max: 1000000000000
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/test/java/org/apache/drill/exec/memory/TestAccountant.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/test/java/org/apache/drill/exec/memory/TestAccountant.java b/exec/memory/base/src/test/java/org/apache/drill/exec/memory/TestAccountant.java
new file mode 100644
index 0000000..31c733f
--- /dev/null
+++ b/exec/memory/base/src/test/java/org/apache/drill/exec/memory/TestAccountant.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.drill.exec.memory.Accountant.AllocationOutcome;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAccountant {
+
+  @Test
+  public void basic() {
+    ensureAccurateReservations(null);
+  }
+
+  @Test
+  public void nested() {
+    final Accountant parent = new Accountant(null, 0, Long.MAX_VALUE);
+    ensureAccurateReservations(parent);
+    assertEquals(0, parent.getAllocatedMemory());
+  }
+
+  @Test
+  public void multiThread() throws InterruptedException {
+    final Accountant parent = new Accountant(null, 0, Long.MAX_VALUE);
+
+    final int numberOfThreads = 32;
+    final int loops = 100;
+    Thread[] threads = new Thread[numberOfThreads];
+
+    for (int i = 0; i < numberOfThreads; i++) {
+      Thread t = new Thread() {
+
+        @Override
+        public void run() {
+          try {
+            for (int i = 0; i < loops; i++) {
+              ensureAccurateReservations(parent);
+            }
+          } catch (Exception ex) {
+            ex.printStackTrace();
+            Assert.fail(ex.getMessage());
+          }
+        }
+
+      };
+      threads[i] = t;
+      t.start();
+    }
+
+    for (Thread thread : threads) {
+      thread.join();
+    }
+
+    assertEquals(0, parent.getAllocatedMemory());
+  }
+
+  private void ensureAccurateReservations(Accountant outsideParent) {
+    final Accountant parent = new Accountant(outsideParent, 0, 10);
+    assertEquals(0, parent.getAllocatedMemory());
+
+    final Accountant child = new Accountant(parent, 2, Long.MAX_VALUE);
+    assertEquals(2, parent.getAllocatedMemory());
+
+    {
+      AllocationOutcome first = child.allocateBytes(1);
+      assertEquals(AllocationOutcome.SUCCESS, first);
+    }
+
+    // child will have new allocation
+    assertEquals(1, child.getAllocatedMemory());
+
+    // root has no change since within reservation
+    assertEquals(2, parent.getAllocatedMemory());
+
+    {
+      AllocationOutcome first = child.allocateBytes(1);
+      assertEquals(AllocationOutcome.SUCCESS, first);
+    }
+
+    // child will have new allocation
+    assertEquals(2, child.getAllocatedMemory());
+
+    // root has no change since within reservation
+    assertEquals(2, parent.getAllocatedMemory());
+
+    child.releaseBytes(1);
+
+    // child will have new allocation
+    assertEquals(1, child.getAllocatedMemory());
+
+    // root has no change since within reservation
+    assertEquals(2, parent.getAllocatedMemory());
+
+    {
+      AllocationOutcome first = child.allocateBytes(2);
+      assertEquals(AllocationOutcome.SUCCESS, first);
+    }
+
+    // child will have new allocation
+    assertEquals(3, child.getAllocatedMemory());
+
+    // went beyond reservation, now in parent accountant
+    assertEquals(3, parent.getAllocatedMemory());
+
+    {
+      AllocationOutcome first = child.allocateBytes(7);
+      assertEquals(AllocationOutcome.SUCCESS, first);
+    }
+
+    // child will have new allocation
+    assertEquals(10, child.getAllocatedMemory());
+
+    // went beyond reservation, now in parent accountant
+    assertEquals(10, parent.getAllocatedMemory());
+
+    child.releaseBytes(9);
+
+    assertEquals(1, child.getAllocatedMemory());
+
+    // back to reservation size
+    assertEquals(2, parent.getAllocatedMemory());
+
+    AllocationOutcome first = child.allocateBytes(10);
+    assertEquals(AllocationOutcome.FAILED_PARENT, first);
+
+    // unchanged
+    assertEquals(1, child.getAllocatedMemory());
+    assertEquals(2, parent.getAllocatedMemory());
+
+    boolean withinLimit = child.forceAllocate(10);
+    assertEquals(false, withinLimit);
+
+    // at new limit
+    assertEquals(child.getAllocatedMemory(), 11);
+    assertEquals(parent.getAllocatedMemory(), 11);
+
+
+    child.releaseBytes(11);
+    assertEquals(child.getAllocatedMemory(), 0);
+    assertEquals(parent.getAllocatedMemory(), 2);
+
+    child.close();
+    parent.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/test/java/org/apache/drill/exec/memory/TestBaseAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/test/java/org/apache/drill/exec/memory/TestBaseAllocator.java b/exec/memory/base/src/test/java/org/apache/drill/exec/memory/TestBaseAllocator.java
new file mode 100644
index 0000000..780d217
--- /dev/null
+++ b/exec/memory/base/src/test/java/org/apache/drill/exec/memory/TestBaseAllocator.java
@@ -0,0 +1,645 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.DrillBuf.TransferResult;
+
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestBaseAllocator {
+  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBaseAllocator.class);
+
+  private final static int MAX_ALLOCATION = 8 * 1024;
+
+/*
+  // ---------------------------------------- DEBUG -----------------------------------
+
+  @After
+  public void checkBuffers() {
+    final int bufferCount = UnsafeDirectLittleEndian.getBufferCount();
+    if (bufferCount != 0) {
+      UnsafeDirectLittleEndian.logBuffers(logger);
+      UnsafeDirectLittleEndian.releaseBuffers();
+    }
+
+    assertEquals(0, bufferCount);
+  }
+
+//  @AfterClass
+//  public static void dumpBuffers() {
+//    UnsafeDirectLittleEndian.logBuffers(logger);
+//  }
+
+  // ---------------------------------------- DEBUG ------------------------------------
+*/
+
+
+  @Test
+  public void test_privateMax() throws Exception {
+    try(final RootAllocator rootAllocator =
+        new RootAllocator(MAX_ALLOCATION)) {
+      final DrillBuf drillBuf1 = rootAllocator.buffer(MAX_ALLOCATION / 2);
+      assertNotNull("allocation failed", drillBuf1);
+
+      try(final BufferAllocator childAllocator =
+          rootAllocator.newChildAllocator("noLimits", 0, MAX_ALLOCATION)) {
+        final DrillBuf drillBuf2 = childAllocator.buffer(MAX_ALLOCATION / 2);
+        assertNotNull("allocation failed", drillBuf2);
+        drillBuf2.release();
+      }
+
+      drillBuf1.release();
+    }
+  }
+
+  @Test(expected=IllegalStateException.class)
+  public void testRootAllocator_closeWithOutstanding() throws Exception {
+    try {
+      try(final RootAllocator rootAllocator =
+          new RootAllocator(MAX_ALLOCATION)) {
+        final DrillBuf drillBuf = rootAllocator.buffer(512);
+        assertNotNull("allocation failed", drillBuf);
+      }
+    } finally {
+      /*
+       * We expect there to be one unreleased underlying buffer because we're closing
+       * without releasing it.
+       */
+/*
+      // ------------------------------- DEBUG ---------------------------------
+      final int bufferCount = UnsafeDirectLittleEndian.getBufferCount();
+      UnsafeDirectLittleEndian.releaseBuffers();
+      assertEquals(1, bufferCount);
+      // ------------------------------- DEBUG ---------------------------------
+*/
+    }
+  }
+
+  @Test
+  public void testRootAllocator_getEmpty() throws Exception {
+    try(final RootAllocator rootAllocator =
+        new RootAllocator(MAX_ALLOCATION)) {
+      final DrillBuf drillBuf = rootAllocator.buffer(0);
+      assertNotNull("allocation failed", drillBuf);
+      assertEquals("capacity was non-zero", 0, drillBuf.capacity());
+      drillBuf.release();
+    }
+  }
+
+  @Ignore // TODO(DRILL-2740)
+  @Test(expected = IllegalStateException.class)
+  public void testAllocator_unreleasedEmpty() throws Exception {
+    try(final RootAllocator rootAllocator =
+        new RootAllocator(MAX_ALLOCATION)) {
+      @SuppressWarnings("unused")
+      final DrillBuf drillBuf = rootAllocator.buffer(0);
+    }
+  }
+
+  @Test
+  public void testAllocator_transferOwnership() throws Exception {
+    try(final RootAllocator rootAllocator =
+        new RootAllocator(MAX_ALLOCATION)) {
+      final BufferAllocator childAllocator1 =
+          rootAllocator.newChildAllocator("changeOwnership1", 0, MAX_ALLOCATION);
+      final BufferAllocator childAllocator2 =
+          rootAllocator.newChildAllocator("changeOwnership2", 0, MAX_ALLOCATION);
+
+      final DrillBuf drillBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4);
+      rootAllocator.verify();
+      TransferResult transferOwnership = drillBuf1.transferOwnership(childAllocator2);
+      final boolean allocationFit = transferOwnership.allocationFit;
+      rootAllocator.verify();
+      assertTrue(allocationFit);
+
+      drillBuf1.release();
+      childAllocator1.close();
+      rootAllocator.verify();
+
+      transferOwnership.buffer.release();
+      childAllocator2.close();
+    }
+  }
+
+  @Test
+  public void testAllocator_shareOwnership() throws Exception {
+    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+      final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("shareOwnership1", 0, MAX_ALLOCATION);
+      final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("shareOwnership2", 0, MAX_ALLOCATION);
+      final DrillBuf drillBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4);
+      rootAllocator.verify();
+
+      // share ownership of buffer.
+      final DrillBuf drillBuf2 = drillBuf1.retain(childAllocator2);
+      rootAllocator.verify();
+      assertNotNull(drillBuf2);
+      assertNotEquals(drillBuf2, drillBuf1);
+
+      // release original buffer (thus transferring ownership to allocator 2. (should leave allocator 1 in empty state)
+      drillBuf1.release();
+      rootAllocator.verify();
+      childAllocator1.close();
+      rootAllocator.verify();
+
+      final BufferAllocator childAllocator3 = rootAllocator.newChildAllocator("shareOwnership3", 0, MAX_ALLOCATION);
+      final DrillBuf drillBuf3 = drillBuf1.retain(childAllocator3);
+      assertNotNull(drillBuf3);
+      assertNotEquals(drillBuf3, drillBuf1);
+      assertNotEquals(drillBuf3, drillBuf2);
+      rootAllocator.verify();
+
+      drillBuf2.release();
+      rootAllocator.verify();
+      childAllocator2.close();
+      rootAllocator.verify();
+
+      drillBuf3.release();
+      rootAllocator.verify();
+      childAllocator3.close();
+    }
+  }
+
+  @Test
+  public void testRootAllocator_createChildAndUse() throws Exception {
+    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+      try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("createChildAndUse", 0,
+          MAX_ALLOCATION)) {
+        final DrillBuf drillBuf = childAllocator.buffer(512);
+        assertNotNull("allocation failed", drillBuf);
+        drillBuf.release();
+      }
+    }
+  }
+
+  @Test(expected=IllegalStateException.class)
+  public void testRootAllocator_createChildDontClose() throws Exception {
+    try {
+      try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+        final BufferAllocator childAllocator = rootAllocator.newChildAllocator("createChildDontClose", 0,
+            MAX_ALLOCATION);
+        final DrillBuf drillBuf = childAllocator.buffer(512);
+        assertNotNull("allocation failed", drillBuf);
+      }
+    } finally {
+      /*
+       * We expect one underlying buffer because we closed a child allocator without
+       * releasing the buffer allocated from it.
+       */
+/*
+      // ------------------------------- DEBUG ---------------------------------
+      final int bufferCount = UnsafeDirectLittleEndian.getBufferCount();
+      UnsafeDirectLittleEndian.releaseBuffers();
+      assertEquals(1, bufferCount);
+      // ------------------------------- DEBUG ---------------------------------
+*/
+    }
+  }
+
+  private static void allocateAndFree(final BufferAllocator allocator) {
+    final DrillBuf drillBuf = allocator.buffer(512);
+    assertNotNull("allocation failed", drillBuf);
+    drillBuf.release();
+
+    final DrillBuf drillBuf2 = allocator.buffer(MAX_ALLOCATION);
+    assertNotNull("allocation failed", drillBuf2);
+    drillBuf2.release();
+
+    final int nBufs = 8;
+    final DrillBuf[] drillBufs = new DrillBuf[nBufs];
+    for(int i = 0; i < drillBufs.length; ++i) {
+      DrillBuf drillBufi = allocator.buffer(MAX_ALLOCATION / nBufs);
+      assertNotNull("allocation failed", drillBufi);
+      drillBufs[i] = drillBufi;
+    }
+    for(DrillBuf drillBufi : drillBufs) {
+      drillBufi.release();
+    }
+  }
+
+  @Test
+  public void testAllocator_manyAllocations() throws Exception {
+    try(final RootAllocator rootAllocator =
+        new RootAllocator(MAX_ALLOCATION)) {
+      try(final BufferAllocator childAllocator =
+          rootAllocator.newChildAllocator("manyAllocations", 0, MAX_ALLOCATION)) {
+        allocateAndFree(childAllocator);
+      }
+    }
+  }
+
+  @Test
+  public void testAllocator_overAllocate() throws Exception {
+    try(final RootAllocator rootAllocator =
+        new RootAllocator(MAX_ALLOCATION)) {
+      try(final BufferAllocator childAllocator =
+          rootAllocator.newChildAllocator("overAllocate", 0, MAX_ALLOCATION)) {
+        allocateAndFree(childAllocator);
+
+        try {
+          childAllocator.buffer(MAX_ALLOCATION + 1);
+          fail("allocated memory beyond max allowed");
+        } catch (OutOfMemoryException e) {
+          // expected
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testAllocator_overAllocateParent() throws Exception {
+    try(final RootAllocator rootAllocator =
+        new RootAllocator(MAX_ALLOCATION)) {
+      try(final BufferAllocator childAllocator =
+          rootAllocator.newChildAllocator("overAllocateParent", 0, MAX_ALLOCATION)) {
+        final DrillBuf drillBuf1 = rootAllocator.buffer(MAX_ALLOCATION / 2);
+        assertNotNull("allocation failed", drillBuf1);
+        final DrillBuf drillBuf2 = childAllocator.buffer(MAX_ALLOCATION / 2);
+        assertNotNull("allocation failed", drillBuf2);
+
+        try {
+          childAllocator.buffer(MAX_ALLOCATION / 4);
+          fail("allocated memory beyond max allowed");
+        } catch (OutOfMemoryException e) {
+          // expected
+        }
+
+        drillBuf1.release();
+        drillBuf2.release();
+      }
+    }
+  }
+
+  private static void testAllocator_sliceUpBufferAndRelease(
+      final RootAllocator rootAllocator, final BufferAllocator bufferAllocator) {
+    final DrillBuf drillBuf1 = bufferAllocator.buffer(MAX_ALLOCATION / 2);
+    rootAllocator.verify();
+
+    final DrillBuf drillBuf2 = drillBuf1.slice(16, drillBuf1.capacity() - 32);
+    rootAllocator.verify();
+    final DrillBuf drillBuf3 = drillBuf2.slice(16, drillBuf2.capacity() - 32);
+    rootAllocator.verify();
+    @SuppressWarnings("unused")
+    final DrillBuf drillBuf4 = drillBuf3.slice(16, drillBuf3.capacity() - 32);
+    rootAllocator.verify();
+
+    drillBuf3.release(); // since they share refcounts, one is enough to release them all
+    rootAllocator.verify();
+  }
+
+  @Test
+  public void testAllocator_createSlices() throws Exception {
+    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+      testAllocator_sliceUpBufferAndRelease(rootAllocator, rootAllocator);
+
+      try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("createSlices", 0, MAX_ALLOCATION)) {
+        testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator);
+      }
+      rootAllocator.verify();
+
+      testAllocator_sliceUpBufferAndRelease(rootAllocator, rootAllocator);
+
+      try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("createSlices", 0, MAX_ALLOCATION)) {
+        try (final BufferAllocator childAllocator2 =
+            childAllocator.newChildAllocator("createSlices", 0, MAX_ALLOCATION)) {
+          final DrillBuf drillBuf1 = childAllocator2.buffer(MAX_ALLOCATION / 8);
+          @SuppressWarnings("unused")
+          final DrillBuf drillBuf2 = drillBuf1.slice(MAX_ALLOCATION / 16, MAX_ALLOCATION / 16);
+          testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator);
+          drillBuf1.release();
+          rootAllocator.verify();
+        }
+        rootAllocator.verify();
+
+        testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator);
+      }
+      rootAllocator.verify();
+    }
+  }
+
+  @Test
+  public void testAllocator_sliceRanges() throws Exception {
+//    final AllocatorOwner allocatorOwner = new NamedOwner("sliceRanges");
+    try(final RootAllocator rootAllocator =
+        new RootAllocator(MAX_ALLOCATION)) {
+      // Populate a buffer with byte values corresponding to their indices.
+      final DrillBuf drillBuf = rootAllocator.buffer(256);
+      assertEquals(256, drillBuf.capacity());
+      assertEquals(0, drillBuf.readerIndex());
+      assertEquals(0, drillBuf.readableBytes());
+      assertEquals(0, drillBuf.writerIndex());
+      assertEquals(256, drillBuf.writableBytes());
+
+      final DrillBuf slice3 = (DrillBuf) drillBuf.slice();
+      assertEquals(0, slice3.readerIndex());
+      assertEquals(0, slice3.readableBytes());
+      assertEquals(0, slice3.writerIndex());
+//      assertEquals(256, slice3.capacity());
+//      assertEquals(256, slice3.writableBytes());
+
+      for(int i = 0; i < 256; ++i) {
+        drillBuf.writeByte(i);
+      }
+      assertEquals(0, drillBuf.readerIndex());
+      assertEquals(256, drillBuf.readableBytes());
+      assertEquals(256, drillBuf.writerIndex());
+      assertEquals(0, drillBuf.writableBytes());
+
+      final DrillBuf slice1 = (DrillBuf) drillBuf.slice();
+      assertEquals(0, slice1.readerIndex());
+      assertEquals(256, slice1.readableBytes());
+      for(int i = 0; i < 10; ++i) {
+        assertEquals(i, slice1.readByte());
+      }
+      assertEquals(256 - 10, slice1.readableBytes());
+      for(int i = 0; i < 256; ++i) {
+        assertEquals((byte) i, slice1.getByte(i));
+      }
+
+      final DrillBuf slice2 = (DrillBuf) drillBuf.slice(25, 25);
+      assertEquals(0, slice2.readerIndex());
+      assertEquals(25, slice2.readableBytes());
+      for(int i = 25; i < 50; ++i) {
+        assertEquals(i, slice2.readByte());
+      }
+
+/*
+      for(int i = 256; i > 0; --i) {
+        slice3.writeByte(i - 1);
+      }
+      for(int i = 0; i < 256; ++i) {
+        assertEquals(255 - i, slice1.getByte(i));
+      }
+*/
+
+      drillBuf.release(); // all the derived buffers share this fate
+    }
+  }
+
+  @Test
+  public void testAllocator_slicesOfSlices() throws Exception {
+//    final AllocatorOwner allocatorOwner = new NamedOwner("slicesOfSlices");
+    try(final RootAllocator rootAllocator =
+        new RootAllocator(MAX_ALLOCATION)) {
+      // Populate a buffer with byte values corresponding to their indices.
+      final DrillBuf drillBuf = rootAllocator.buffer(256);
+      for(int i = 0; i < 256; ++i) {
+        drillBuf.writeByte(i);
+      }
+
+      // Slice it up.
+      final DrillBuf slice0 = drillBuf.slice(0, drillBuf.capacity());
+      for(int i = 0; i < 256; ++i) {
+        assertEquals((byte) i, drillBuf.getByte(i));
+      }
+
+      final DrillBuf slice10 = slice0.slice(10, drillBuf.capacity() - 10);
+      for(int i = 10; i < 256; ++i) {
+        assertEquals((byte) i, slice10.getByte(i - 10));
+      }
+
+      final DrillBuf slice20 = slice10.slice(10, drillBuf.capacity() - 20);
+      for(int i = 20; i < 256; ++i) {
+        assertEquals((byte) i, slice20.getByte(i - 20));
+      }
+
+      final DrillBuf slice30 = slice20.slice(10,  drillBuf.capacity() - 30);
+      for(int i = 30; i < 256; ++i) {
+        assertEquals((byte) i, slice30.getByte(i - 30));
+      }
+
+      drillBuf.release();
+    }
+  }
+
+  @Test
+  public void testAllocator_transferSliced() throws Exception {
+    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+      final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferSliced1", 0, MAX_ALLOCATION);
+      final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferSliced2", 0, MAX_ALLOCATION);
+
+      final DrillBuf drillBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8);
+      final DrillBuf drillBuf2 = childAllocator2.buffer(MAX_ALLOCATION / 8);
+
+      final DrillBuf drillBuf1s = drillBuf1.slice(0, drillBuf1.capacity() / 2);
+      final DrillBuf drillBuf2s = drillBuf2.slice(0, drillBuf2.capacity() / 2);
+
+      rootAllocator.verify();
+
+      TransferResult result1 = drillBuf2s.transferOwnership(childAllocator1);
+      rootAllocator.verify();
+      TransferResult result2 = drillBuf1s.transferOwnership(childAllocator2);
+      rootAllocator.verify();
+
+      result1.buffer.release();
+      result2.buffer.release();
+
+      drillBuf1s.release(); // releases drillBuf1
+      drillBuf2s.release(); // releases drillBuf2
+
+      childAllocator1.close();
+      childAllocator2.close();
+    }
+  }
+
+  @Test
+  public void testAllocator_shareSliced() throws Exception {
+    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+      final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferSliced", 0, MAX_ALLOCATION);
+      final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferSliced", 0, MAX_ALLOCATION);
+
+      final DrillBuf drillBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8);
+      final DrillBuf drillBuf2 = childAllocator2.buffer(MAX_ALLOCATION / 8);
+
+      final DrillBuf drillBuf1s = drillBuf1.slice(0, drillBuf1.capacity() / 2);
+      final DrillBuf drillBuf2s = drillBuf2.slice(0, drillBuf2.capacity() / 2);
+
+      rootAllocator.verify();
+
+      final DrillBuf drillBuf2s1 = drillBuf2s.retain(childAllocator1);
+      final DrillBuf drillBuf1s2 = drillBuf1s.retain(childAllocator2);
+      rootAllocator.verify();
+
+      drillBuf1s.release(); // releases drillBuf1
+      drillBuf2s.release(); // releases drillBuf2
+      rootAllocator.verify();
+
+      drillBuf2s1.release(); // releases the shared drillBuf2 slice
+      drillBuf1s2.release(); // releases the shared drillBuf1 slice
+
+      childAllocator1.close();
+      childAllocator2.close();
+    }
+  }
+
+  @Test
+  public void testAllocator_transferShared() throws Exception {
+    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+      final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferShared1", 0, MAX_ALLOCATION);
+      final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferShared2", 0, MAX_ALLOCATION);
+      final BufferAllocator childAllocator3 = rootAllocator.newChildAllocator("transferShared3", 0, MAX_ALLOCATION);
+
+      final DrillBuf drillBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8);
+
+      boolean allocationFit;
+
+      DrillBuf drillBuf2 = drillBuf1.retain(childAllocator2);
+      rootAllocator.verify();
+      assertNotNull(drillBuf2);
+      assertNotEquals(drillBuf2, drillBuf1);
+
+      TransferResult result = drillBuf1.transferOwnership(childAllocator3);
+      allocationFit = result.allocationFit;
+      final DrillBuf drillBuf3 = result.buffer;
+      assertTrue(allocationFit);
+      rootAllocator.verify();
+
+      // Since childAllocator3 now has childAllocator1's buffer, 1, can close
+      drillBuf1.release();
+      childAllocator1.close();
+      rootAllocator.verify();
+
+      drillBuf2.release();
+      childAllocator2.close();
+      rootAllocator.verify();
+
+      final BufferAllocator childAllocator4 = rootAllocator.newChildAllocator("transferShared4", 0, MAX_ALLOCATION);
+      TransferResult result2 = drillBuf3.transferOwnership(childAllocator4);
+      allocationFit = result.allocationFit;
+      final DrillBuf drillBuf4 = result2.buffer;
+      assertTrue(allocationFit);
+      rootAllocator.verify();
+
+      drillBuf3.release();
+      childAllocator3.close();
+      rootAllocator.verify();
+
+      drillBuf4.release();
+      childAllocator4.close();
+      rootAllocator.verify();
+    }
+  }
+
+  @Test
+  public void testAllocator_unclaimedReservation() throws Exception {
+    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+      try (final BufferAllocator childAllocator1 =
+          rootAllocator.newChildAllocator("unclaimedReservation", 0, MAX_ALLOCATION)) {
+        try(final AllocationReservation reservation = childAllocator1.newReservation()) {
+          assertTrue(reservation.add(64));
+        }
+        rootAllocator.verify();
+      }
+    }
+  }
+
+  @Test
+  public void testAllocator_claimedReservation() throws Exception {
+    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+
+      try (final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("claimedReservation", 0,
+          MAX_ALLOCATION)) {
+
+        try (final AllocationReservation reservation = childAllocator1.newReservation()) {
+          assertTrue(reservation.add(32));
+          assertTrue(reservation.add(32));
+
+          final DrillBuf drillBuf = reservation.allocateBuffer();
+          assertEquals(64, drillBuf.capacity());
+          rootAllocator.verify();
+
+          drillBuf.release();
+          rootAllocator.verify();
+        }
+        rootAllocator.verify();
+      }
+    }
+  }
+
+  @Test
+  public void multiple() throws Exception {
+    final String owner = "test";
+    try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+
+      final int op = 100000;
+
+      BufferAllocator frag1 = allocator.newChildAllocator(owner, 1500000, Long.MAX_VALUE);
+      BufferAllocator frag2 = allocator.newChildAllocator(owner, 500000, Long.MAX_VALUE);
+
+      allocator.verify();
+
+      BufferAllocator allocator11 = frag1.newChildAllocator(owner, op, Long.MAX_VALUE);
+      DrillBuf b11 = allocator11.buffer(1000000);
+
+      allocator.verify();
+
+      BufferAllocator allocator12 = frag1.newChildAllocator(owner, op, Long.MAX_VALUE);
+      DrillBuf b12 = allocator12.buffer(500000);
+
+      allocator.verify();
+
+      BufferAllocator allocator21 = frag1.newChildAllocator(owner, op, Long.MAX_VALUE);
+
+      allocator.verify();
+
+      BufferAllocator allocator22 = frag2.newChildAllocator(owner, op, Long.MAX_VALUE);
+      DrillBuf b22 = allocator22.buffer(2000000);
+
+      allocator.verify();
+
+      BufferAllocator frag3 = allocator.newChildAllocator(owner, 1000000, Long.MAX_VALUE);
+
+      allocator.verify();
+
+      BufferAllocator allocator31 = frag3.newChildAllocator(owner, op, Long.MAX_VALUE);
+      DrillBuf b31a = allocator31.buffer(200000);
+
+      allocator.verify();
+
+      // Previously running operator completes
+      b22.release();
+
+      allocator.verify();
+
+      allocator22.close();
+
+      b31a.release();
+      allocator31.close();
+
+      b12.release();
+      allocator12.close();
+
+      allocator21.close();
+
+      b11.release();
+      allocator11.close();
+
+      frag1.close();
+      frag2.close();
+      frag3.close();
+
+    }
+  }
+}