You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ja...@apache.org on 2016/02/17 13:39:45 UTC

[10/17] arrow git commit: ARROW-1: Initial Arrow Code Commit

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
new file mode 100644
index 0000000..0db6144
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.memory;
+
+import static org.apache.arrow.memory.BaseAllocator.indent;
+import io.netty.buffer.ArrowBuf;
+import io.netty.buffer.PooledByteBufAllocatorL;
+import io.netty.buffer.UnsafeDirectLittleEndian;
+
+import java.util.IdentityHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.arrow.memory.BaseAllocator.Verbosity;
+import org.apache.arrow.memory.util.AutoCloseableLock;
+import org.apache.arrow.memory.util.HistoricalLog;
+import org.apache.arrow.memory.util.Metrics;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Manages the relationship between one or more allocators and a particular UDLE. Ensures that one allocator owns the
+ * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its associated allocators.
+ * This class is also responsible for managing when memory is allocated and returned to the Netty-based
+ * PooledByteBufAllocatorL.
+ *
+ * The only reason that this isn't package private is we're forced to put DrillBuf in Netty's package which need access
+ * to these objects or methods.
+ *
+ * Threading: AllocationManager manages thread-safety internally. Operations within the context of a single BufferLedger
+ * are lockless in nature and can be leveraged by multiple threads. Operations that cross the context of two ledgers
+ * will acquire a lock on the AllocationManager instance. Important note, there is one AllocationManager per
+ * UnsafeDirectLittleEndian buffer allocation. As such, there will be thousands of these in a typical query. The
+ * contention of acquiring a lock on AllocationManager should be very low.
+ *
+ */
+public class AllocationManager {
+  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AllocationManager.class);
+
+  private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0);
+  private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0);
+  static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL(Metrics.getInstance());
+
+  private final RootAllocator root;
+  private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet();
+  private final int size;
+  private final UnsafeDirectLittleEndian underlying;
+  private final IdentityHashMap<BufferAllocator, BufferLedger> map = new IdentityHashMap<>();
+  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+  private final AutoCloseableLock readLock = new AutoCloseableLock(lock.readLock());
+  private final AutoCloseableLock writeLock = new AutoCloseableLock(lock.writeLock());
+  private final long amCreationTime = System.nanoTime();
+
+  private volatile BufferLedger owningLedger;
+  private volatile long amDestructionTime = 0;
+
+  AllocationManager(BaseAllocator accountingAllocator, int size) {
+    Preconditions.checkNotNull(accountingAllocator);
+    accountingAllocator.assertOpen();
+
+    this.root = accountingAllocator.root;
+    this.underlying = INNER_ALLOCATOR.allocate(size);
+
+    // we do a no retain association since our creator will want to retrieve the newly created ledger and will create a
+    // reference count at that point
+    this.owningLedger = associate(accountingAllocator, false);
+    this.size = underlying.capacity();
+  }
+
+  /**
+   * Associate the existing underlying buffer with a new allocator. This will increase the reference count to the
+   * provided ledger by 1.
+   * @param allocator
+   *          The target allocator to associate this buffer with.
+   * @return The Ledger (new or existing) that associates the underlying buffer to this new ledger.
+   */
+  BufferLedger associate(final BaseAllocator allocator) {
+    return associate(allocator, true);
+  }
+
+  private BufferLedger associate(final BaseAllocator allocator, final boolean retain) {
+    allocator.assertOpen();
+
+    if (root != allocator.root) {
+      throw new IllegalStateException(
+          "A buffer can only be associated between two allocators that share the same root.");
+    }
+
+    try (AutoCloseableLock read = readLock.open()) {
+
+      final BufferLedger ledger = map.get(allocator);
+      if (ledger != null) {
+        if (retain) {
+          ledger.inc();
+        }
+        return ledger;
+      }
+
+    }
+    try (AutoCloseableLock write = writeLock.open()) {
+      // we have to recheck existing ledger since a second reader => writer could be competing with us.
+
+      final BufferLedger existingLedger = map.get(allocator);
+      if (existingLedger != null) {
+        if (retain) {
+          existingLedger.inc();
+        }
+        return existingLedger;
+      }
+
+      final BufferLedger ledger = new BufferLedger(allocator, new ReleaseListener(allocator));
+      if (retain) {
+        ledger.inc();
+      }
+      BufferLedger oldLedger = map.put(allocator, ledger);
+      Preconditions.checkArgument(oldLedger == null);
+      allocator.associateLedger(ledger);
+      return ledger;
+    }
+  }
+
+
+  /**
+   * The way that a particular BufferLedger communicates back to the AllocationManager that it now longer needs to hold
+   * a reference to particular piece of memory.
+   */
+  private class ReleaseListener {
+
+    private final BufferAllocator allocator;
+
+    public ReleaseListener(BufferAllocator allocator) {
+      this.allocator = allocator;
+    }
+
+    /**
+     * Can only be called when you already hold the writeLock.
+     */
+    public void release() {
+      allocator.assertOpen();
+
+      final BufferLedger oldLedger = map.remove(allocator);
+      oldLedger.allocator.dissociateLedger(oldLedger);
+
+      if (oldLedger == owningLedger) {
+        if (map.isEmpty()) {
+          // no one else owns, lets release.
+          oldLedger.allocator.releaseBytes(size);
+          underlying.release();
+          amDestructionTime = System.nanoTime();
+          owningLedger = null;
+        } else {
+          // we need to change the owning allocator. we've been removed so we'll get whatever is top of list
+          BufferLedger newLedger = map.values().iterator().next();
+
+          // we'll forcefully transfer the ownership and not worry about whether we exceeded the limit
+          // since this consumer can't do anything with this.
+          oldLedger.transferBalance(newLedger);
+        }
+      } else {
+        if (map.isEmpty()) {
+          throw new IllegalStateException("The final removal of a ledger should be connected to the owning ledger.");
+        }
+      }
+
+
+    }
+  }
+
+  /**
+   * The reference manager that binds an allocator manager to a particular BaseAllocator. Also responsible for creating
+   * a set of DrillBufs that share a common fate and set of reference counts.
+   * As with AllocationManager, the only reason this is public is due to DrillBuf being in io.netty.buffer package.
+   */
+  public class BufferLedger {
+
+    private final IdentityHashMap<ArrowBuf, Object> buffers =
+        BaseAllocator.DEBUG ? new IdentityHashMap<ArrowBuf, Object>() : null;
+
+    private final long ledgerId = LEDGER_ID_GENERATOR.incrementAndGet(); // unique ID assigned to each ledger
+    private final AtomicInteger bufRefCnt = new AtomicInteger(0); // start at zero so we can manage request for retain
+                                                                  // correctly
+    private final long lCreationTime = System.nanoTime();
+    private volatile long lDestructionTime = 0;
+    private final BaseAllocator allocator;
+    private final ReleaseListener listener;
+    private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH,
+        "BufferLedger[%d]", 1)
+        : null;
+
+    private BufferLedger(BaseAllocator allocator, ReleaseListener listener) {
+      this.allocator = allocator;
+      this.listener = listener;
+    }
+
+    /**
+     * Transfer any balance the current ledger has to the target ledger. In the case that the current ledger holds no
+     * memory, no transfer is made to the new ledger.
+     * @param target
+     *          The ledger to transfer ownership account to.
+     * @return Whether transfer fit within target ledgers limits.
+     */
+    public boolean transferBalance(final BufferLedger target) {
+      Preconditions.checkNotNull(target);
+      Preconditions.checkArgument(allocator.root == target.allocator.root,
+          "You can only transfer between two allocators that share the same root.");
+      allocator.assertOpen();
+
+      target.allocator.assertOpen();
+      // if we're transferring to ourself, just return.
+      if (target == this) {
+        return true;
+      }
+
+      // since two balance transfers out from the allocator manager could cause incorrect accounting, we need to ensure
+      // that this won't happen by synchronizing on the allocator manager instance.
+      try (AutoCloseableLock write = writeLock.open()) {
+        if (owningLedger != this) {
+          return true;
+        }
+
+        if (BaseAllocator.DEBUG) {
+          this.historicalLog.recordEvent("transferBalance(%s)", target.allocator.name);
+          target.historicalLog.recordEvent("incoming(from %s)", owningLedger.allocator.name);
+        }
+
+        boolean overlimit = target.allocator.forceAllocate(size);
+        allocator.releaseBytes(size);
+        owningLedger = target;
+        return overlimit;
+      }
+
+    }
+
+    /**
+     * Print the current ledger state to a the provided StringBuilder.
+     * @param sb
+     *          The StringBuilder to populate.
+     * @param indent
+     *          The level of indentation to position the data.
+     * @param verbosity
+     *          The level of verbosity to print.
+     */
+    public void print(StringBuilder sb, int indent, Verbosity verbosity) {
+      indent(sb, indent)
+          .append("ledger[")
+          .append(ledgerId)
+          .append("] allocator: ")
+          .append(allocator.name)
+          .append("), isOwning: ")
+          .append(owningLedger == this)
+          .append(", size: ")
+          .append(size)
+          .append(", references: ")
+          .append(bufRefCnt.get())
+          .append(", life: ")
+          .append(lCreationTime)
+          .append("..")
+          .append(lDestructionTime)
+          .append(", allocatorManager: [")
+          .append(AllocationManager.this.allocatorManagerId)
+          .append(", life: ")
+          .append(amCreationTime)
+          .append("..")
+          .append(amDestructionTime);
+
+      if (!BaseAllocator.DEBUG) {
+        sb.append("]\n");
+      } else {
+        synchronized (buffers) {
+          sb.append("] holds ")
+              .append(buffers.size())
+              .append(" buffers. \n");
+          for (ArrowBuf buf : buffers.keySet()) {
+            buf.print(sb, indent + 2, verbosity);
+            sb.append('\n');
+          }
+        }
+      }
+
+    }
+
+    private void inc() {
+      bufRefCnt.incrementAndGet();
+    }
+
+    /**
+     * Decrement the ledger's reference count. If the ledger is decremented to zero, this ledger should release its
+     * ownership back to the AllocationManager
+     */
+    public int decrement(int decrement) {
+      allocator.assertOpen();
+
+      final int outcome;
+      try (AutoCloseableLock write = writeLock.open()) {
+        outcome = bufRefCnt.addAndGet(-decrement);
+        if (outcome == 0) {
+          lDestructionTime = System.nanoTime();
+          listener.release();
+        }
+      }
+
+      return outcome;
+    }
+
+    /**
+     * Returns the ledger associated with a particular BufferAllocator. If the BufferAllocator doesn't currently have a
+     * ledger associated with this AllocationManager, a new one is created. This is placed on BufferLedger rather than
+     * AllocationManager directly because DrillBufs don't have access to AllocationManager and they are the ones
+     * responsible for exposing the ability to associate multiple allocators with a particular piece of underlying
+     * memory. Note that this will increment the reference count of this ledger by one to ensure the ledger isn't
+     * destroyed before use.
+     *
+     * @param allocator
+     * @return
+     */
+    public BufferLedger getLedgerForAllocator(BufferAllocator allocator) {
+      return associate((BaseAllocator) allocator);
+    }
+
+    /**
+     * Create a new DrillBuf associated with this AllocationManager and memory. Does not impact reference count.
+     * Typically used for slicing.
+     * @param offset
+     *          The offset in bytes to start this new DrillBuf.
+     * @param length
+     *          The length in bytes that this DrillBuf will provide access to.
+     * @return A new DrillBuf that shares references with all DrillBufs associated with this BufferLedger
+     */
+    public ArrowBuf newDrillBuf(int offset, int length) {
+      allocator.assertOpen();
+      return newDrillBuf(offset, length, null);
+    }
+
+    /**
+     * Create a new DrillBuf associated with this AllocationManager and memory.
+     * @param offset
+     *          The offset in bytes to start this new DrillBuf.
+     * @param length
+     *          The length in bytes that this DrillBuf will provide access to.
+     * @param manager
+     *          An optional BufferManager argument that can be used to manage expansion of this DrillBuf
+     * @param retain
+     *          Whether or not the newly created buffer should get an additional reference count added to it.
+     * @return A new DrillBuf that shares references with all DrillBufs associated with this BufferLedger
+     */
+    public ArrowBuf newDrillBuf(int offset, int length, BufferManager manager) {
+      allocator.assertOpen();
+
+      final ArrowBuf buf = new ArrowBuf(
+          bufRefCnt,
+          this,
+          underlying,
+          manager,
+          allocator.getAsByteBufAllocator(),
+          offset,
+          length,
+          false);
+
+      if (BaseAllocator.DEBUG) {
+        historicalLog.recordEvent(
+            "DrillBuf(BufferLedger, BufferAllocator[%s], UnsafeDirectLittleEndian[identityHashCode == "
+                + "%d](%s)) => ledger hc == %d",
+            allocator.name, System.identityHashCode(buf), buf.toString(),
+            System.identityHashCode(this));
+
+        synchronized (buffers) {
+          buffers.put(buf, null);
+        }
+      }
+
+      return buf;
+
+    }
+
+    /**
+     * What is the total size (in bytes) of memory underlying this ledger.
+     *
+     * @return Size in bytes
+     */
+    public int getSize() {
+      return size;
+    }
+
+    /**
+     * How much memory is accounted for by this ledger. This is either getSize() if this is the owning ledger for the
+     * memory or zero in the case that this is not the owning ledger associated with this memory.
+     *
+     * @return Amount of accounted(owned) memory associated with this ledger.
+     */
+    public int getAccountedSize() {
+      try (AutoCloseableLock read = readLock.open()) {
+        if (owningLedger == this) {
+          return size;
+        } else {
+          return 0;
+        }
+      }
+    }
+
+    /**
+     * Package visible for debugging/verification only.
+     */
+    UnsafeDirectLittleEndian getUnderlying() {
+      return underlying;
+    }
+
+    /**
+     * Package visible for debugging/verification only.
+     */
+    boolean isOwningLedger() {
+      return this == owningLedger;
+    }
+
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/org/apache/arrow/memory/AllocationReservation.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationReservation.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationReservation.java
new file mode 100644
index 0000000..68d1244
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationReservation.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.memory;
+
+import io.netty.buffer.ArrowBuf;
+
+/**
+ * Supports cumulative allocation reservation. Clients may increase the size of the reservation repeatedly until they
+ * call for an allocation of the current total size. The reservation can only be used once, and will throw an exception
+ * if it is used more than once.
+ * <p>
+ * For the purposes of airtight memory accounting, the reservation must be close()d whether it is used or not.
+ * This is not threadsafe.
+ */
+public interface AllocationReservation extends AutoCloseable {
+
+  /**
+   * Add to the current reservation.
+   *
+   * <p>Adding may fail if the allocator is not allowed to consume any more space.
+   *
+   * @param nBytes the number of bytes to add
+   * @return true if the addition is possible, false otherwise
+   * @throws IllegalStateException if called after buffer() is used to allocate the reservation
+   */
+  boolean add(final int nBytes);
+
+  /**
+   * Requests a reservation of additional space.
+   *
+   * <p>The implementation of the allocator's inner class provides this.
+   *
+   * @param nBytes the amount to reserve
+   * @return true if the reservation can be satisfied, false otherwise
+   */
+  boolean reserve(int nBytes);
+
+  /**
+   * Allocate a buffer whose size is the total of all the add()s made.
+   *
+   * <p>The allocation request can still fail, even if the amount of space
+   * requested is available, if the allocation cannot be made contiguously.
+   *
+   * @return the buffer, or null, if the request cannot be satisfied
+   * @throws IllegalStateException if called called more than once
+   */
+  ArrowBuf allocateBuffer();
+
+  /**
+   * Get the current size of the reservation (the sum of all the add()s).
+   *
+   * @return size of the current reservation
+   */
+  int getSize();
+
+  /**
+   * Return whether or not the reservation has been used.
+   *
+   * @return whether or not the reservation has been used
+   */
+  public boolean isUsed();
+
+  /**
+   * Return whether or not the reservation has been closed.
+   *
+   * @return whether or not the reservation has been closed
+   */
+  public boolean isClosed();
+
+  public void close();
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/org/apache/arrow/memory/AllocatorClosedException.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocatorClosedException.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocatorClosedException.java
new file mode 100644
index 0000000..5664579
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocatorClosedException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.memory;
+
+/**
+ * Exception thrown when a closed BufferAllocator is used. Note
+ * this is an unchecked exception.
+ *
+ * @param message string associated with the cause
+ */
+@SuppressWarnings("serial")
+public class AllocatorClosedException extends RuntimeException {
+  public AllocatorClosedException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
new file mode 100644
index 0000000..72f77ab
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
@@ -0,0 +1,781 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.memory;
+
+import io.netty.buffer.ArrowBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnsafeDirectLittleEndian;
+
+import java.util.Arrays;
+import java.util.IdentityHashMap;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.arrow.memory.AllocationManager.BufferLedger;
+import org.apache.arrow.memory.util.AssertionUtil;
+import org.apache.arrow.memory.util.HistoricalLog;
+
+import com.google.common.base.Preconditions;
+
+public abstract class BaseAllocator extends Accountant implements BufferAllocator {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseAllocator.class);
+
+  public static final String DEBUG_ALLOCATOR = "arrow.memory.debug.allocator";
+
+  private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
+  private static final int CHUNK_SIZE = AllocationManager.INNER_ALLOCATOR.getChunkSize();
+
+  public static final int DEBUG_LOG_LENGTH = 6;
+  public static final boolean DEBUG = AssertionUtil.isAssertionsEnabled()
+      || Boolean.parseBoolean(System.getProperty(DEBUG_ALLOCATOR, "false"));
+  private final Object DEBUG_LOCK = DEBUG ? new Object() : null;
+
+  private final BaseAllocator parentAllocator;
+  private final ByteBufAllocator thisAsByteBufAllocator;
+  private final IdentityHashMap<BaseAllocator, Object> childAllocators;
+  private final ArrowBuf empty;
+
+  private volatile boolean isClosed = false; // the allocator has been closed
+
+  // Package exposed for sharing between AllocatorManger and BaseAllocator objects
+  final String name;
+  final RootAllocator root;
+
+  // members used purely for debugging
+  private final IdentityHashMap<BufferLedger, Object> childLedgers;
+  private final IdentityHashMap<Reservation, Object> reservations;
+  private final HistoricalLog historicalLog;
+
+  protected BaseAllocator(
+      final BaseAllocator parentAllocator,
+      final String name,
+      final long initReservation,
+      final long maxAllocation) throws OutOfMemoryException {
+    super(parentAllocator, initReservation, maxAllocation);
+
+    if (parentAllocator != null) {
+      this.root = parentAllocator.root;
+      empty = parentAllocator.empty;
+    } else if (this instanceof RootAllocator) {
+      this.root = (RootAllocator) this;
+      empty = createEmpty();
+    } else {
+      throw new IllegalStateException("An parent allocator must either carry a root or be the root.");
+    }
+
+    this.parentAllocator = parentAllocator;
+    this.name = name;
+
+    this.thisAsByteBufAllocator = new DrillByteBufAllocator(this);
+
+    if (DEBUG) {
+      childAllocators = new IdentityHashMap<>();
+      reservations = new IdentityHashMap<>();
+      childLedgers = new IdentityHashMap<>();
+      historicalLog = new HistoricalLog(DEBUG_LOG_LENGTH, "allocator[%s]", name);
+      hist("created by \"%s\", owned = %d", name, this.getAllocatedMemory());
+    } else {
+      childAllocators = null;
+      reservations = null;
+      historicalLog = null;
+      childLedgers = null;
+    }
+
+  }
+
+  public void assertOpen() {
+    if (AssertionUtil.ASSERT_ENABLED) {
+      if (isClosed) {
+        throw new IllegalStateException("Attempting operation on allocator when allocator is closed.\n"
+            + toVerboseString());
+      }
+    }
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public ArrowBuf getEmpty() {
+    assertOpen();
+    return empty;
+  }
+
+  /**
+   * For debug/verification purposes only. Allows an AllocationManager to tell the allocator that we have a new ledger
+   * associated with this allocator.
+   */
+  void associateLedger(BufferLedger ledger) {
+    assertOpen();
+    if (DEBUG) {
+      synchronized (DEBUG_LOCK) {
+        childLedgers.put(ledger, null);
+      }
+    }
+  }
+
+  /**
+   * For debug/verification purposes only. Allows an AllocationManager to tell the allocator that we are removing a
+   * ledger associated with this allocator
+   */
+  void dissociateLedger(BufferLedger ledger) {
+    assertOpen();
+    if (DEBUG) {
+      synchronized (DEBUG_LOCK) {
+        if (!childLedgers.containsKey(ledger)) {
+          throw new IllegalStateException("Trying to remove a child ledger that doesn't exist.");
+        }
+        childLedgers.remove(ledger);
+      }
+    }
+  }
+
+  /**
+   * Track when a ChildAllocator of this BaseAllocator is closed. Used for debugging purposes.
+   *
+   * @param childAllocator
+   *          The child allocator that has been closed.
+   */
+  private void childClosed(final BaseAllocator childAllocator) {
+    assertOpen();
+
+    if (DEBUG) {
+      Preconditions.checkArgument(childAllocator != null, "child allocator can't be null");
+
+      synchronized (DEBUG_LOCK) {
+        final Object object = childAllocators.remove(childAllocator);
+        if (object == null) {
+          childAllocator.historicalLog.logHistory(logger);
+          throw new IllegalStateException("Child allocator[" + childAllocator.name
+              + "] not found in parent allocator[" + name + "]'s childAllocators");
+        }
+      }
+    }
+  }
+
+  private static String createErrorMsg(final BufferAllocator allocator, final int rounded, final int requested) {
+    if (rounded != requested) {
+      return String.format(
+          "Unable to allocate buffer of size %d (rounded from %d) due to memory limit. Current allocation: %d",
+          rounded, requested, allocator.getAllocatedMemory());
+    } else {
+      return String.format("Unable to allocate buffer of size %d due to memory limit. Current allocation: %d",
+          rounded, allocator.getAllocatedMemory());
+    }
+  }
+
+  @Override
+  public ArrowBuf buffer(final int initialRequestSize) {
+    assertOpen();
+
+    return buffer(initialRequestSize, null);
+  }
+
+  private ArrowBuf createEmpty(){
+    assertOpen();
+
+    return new ArrowBuf(new AtomicInteger(), null, AllocationManager.INNER_ALLOCATOR.empty, null, null, 0, 0, true);
+  }
+
+  @Override
+  public ArrowBuf buffer(final int initialRequestSize, BufferManager manager) {
+    assertOpen();
+
+    Preconditions.checkArgument(initialRequestSize >= 0, "the requested size must be non-negative");
+
+    if (initialRequestSize == 0) {
+      return empty;
+    }
+
+    // round to next largest power of two if we're within a chunk since that is how our allocator operates
+    final int actualRequestSize = initialRequestSize < CHUNK_SIZE ?
+        nextPowerOfTwo(initialRequestSize)
+        : initialRequestSize;
+    AllocationOutcome outcome = this.allocateBytes(actualRequestSize);
+    if (!outcome.isOk()) {
+      throw new OutOfMemoryException(createErrorMsg(this, actualRequestSize, initialRequestSize));
+    }
+
+    boolean success = false;
+    try {
+      ArrowBuf buffer = bufferWithoutReservation(actualRequestSize, manager);
+      success = true;
+      return buffer;
+    } finally {
+      if (!success) {
+        releaseBytes(actualRequestSize);
+      }
+    }
+
+  }
+
+  /**
+   * Used by usual allocation as well as for allocating a pre-reserved buffer. Skips the typical accounting associated
+   * with creating a new buffer.
+   */
+  private ArrowBuf bufferWithoutReservation(final int size, BufferManager bufferManager) throws OutOfMemoryException {
+    assertOpen();
+
+    final AllocationManager manager = new AllocationManager(this, size);
+    final BufferLedger ledger = manager.associate(this); // +1 ref cnt (required)
+    final ArrowBuf buffer = ledger.newDrillBuf(0, size, bufferManager);
+
+    // make sure that our allocation is equal to what we expected.
+    Preconditions.checkArgument(buffer.capacity() == size,
+        "Allocated capacity %d was not equal to requested capacity %d.", buffer.capacity(), size);
+
+    return buffer;
+  }
+
+  @Override
+  public ByteBufAllocator getAsByteBufAllocator() {
+    return thisAsByteBufAllocator;
+  }
+
+  @Override
+  public BufferAllocator newChildAllocator(
+      final String name,
+      final long initReservation,
+      final long maxAllocation) {
+    assertOpen();
+
+    final ChildAllocator childAllocator = new ChildAllocator(this, name, initReservation, maxAllocation);
+
+    if (DEBUG) {
+      synchronized (DEBUG_LOCK) {
+        childAllocators.put(childAllocator, childAllocator);
+        historicalLog.recordEvent("allocator[%s] created new child allocator[%s]", name, childAllocator.name);
+      }
+    }
+
+    return childAllocator;
+  }
+
+  public class Reservation implements AllocationReservation {
+    private int nBytes = 0;
+    private boolean used = false;
+    private boolean closed = false;
+    private final HistoricalLog historicalLog;
+
+    public Reservation() {
+      if (DEBUG) {
+        historicalLog = new HistoricalLog("Reservation[allocator[%s], %d]", name, System.identityHashCode(this));
+        historicalLog.recordEvent("created");
+        synchronized (DEBUG_LOCK) {
+          reservations.put(this, this);
+        }
+      } else {
+        historicalLog = null;
+      }
+    }
+
+    public boolean add(final int nBytes) {
+      assertOpen();
+
+      Preconditions.checkArgument(nBytes >= 0, "nBytes(%d) < 0", nBytes);
+      Preconditions.checkState(!closed, "Attempt to increase reservation after reservation has been closed");
+      Preconditions.checkState(!used, "Attempt to increase reservation after reservation has been used");
+
+      // we round up to next power of two since all reservations are done in powers of two. This may overestimate the
+      // preallocation since someone may perceive additions to be power of two. If this becomes a problem, we can look
+      // at
+      // modifying this behavior so that we maintain what we reserve and what the user asked for and make sure to only
+      // round to power of two as necessary.
+      final int nBytesTwo = BaseAllocator.nextPowerOfTwo(nBytes);
+      if (!reserve(nBytesTwo)) {
+        return false;
+      }
+
+      this.nBytes += nBytesTwo;
+      return true;
+    }
+
+    public ArrowBuf allocateBuffer() {
+      assertOpen();
+
+      Preconditions.checkState(!closed, "Attempt to allocate after closed");
+      Preconditions.checkState(!used, "Attempt to allocate more than once");
+
+      final ArrowBuf drillBuf = allocate(nBytes);
+      used = true;
+      return drillBuf;
+    }
+
+    public int getSize() {
+      return nBytes;
+    }
+
+    public boolean isUsed() {
+      return used;
+    }
+
+    public boolean isClosed() {
+      return closed;
+    }
+
+    @Override
+    public void close() {
+      assertOpen();
+
+      if (closed) {
+        return;
+      }
+
+      if (DEBUG) {
+        if (!isClosed()) {
+          final Object object;
+          synchronized (DEBUG_LOCK) {
+            object = reservations.remove(this);
+          }
+          if (object == null) {
+            final StringBuilder sb = new StringBuilder();
+            print(sb, 0, Verbosity.LOG_WITH_STACKTRACE);
+            logger.debug(sb.toString());
+            throw new IllegalStateException(
+                String.format("Didn't find closing reservation[%d]", System.identityHashCode(this)));
+          }
+
+          historicalLog.recordEvent("closed");
+        }
+      }
+
+      if (!used) {
+        releaseReservation(nBytes);
+      }
+
+      closed = true;
+    }
+
+    public boolean reserve(int nBytes) {
+      assertOpen();
+
+      final AllocationOutcome outcome = BaseAllocator.this.allocateBytes(nBytes);
+
+      if (DEBUG) {
+        historicalLog.recordEvent("reserve(%d) => %s", nBytes, Boolean.toString(outcome.isOk()));
+      }
+
+      return outcome.isOk();
+    }
+
+    /**
+     * Allocate the a buffer of the requested size.
+     *
+     * <p>
+     * The implementation of the allocator's inner class provides this.
+     *
+     * @param nBytes
+     *          the size of the buffer requested
+     * @return the buffer, or null, if the request cannot be satisfied
+     */
+    private ArrowBuf allocate(int nBytes) {
+      assertOpen();
+
+      boolean success = false;
+
+      /*
+       * The reservation already added the requested bytes to the allocators owned and allocated bytes via reserve().
+       * This ensures that they can't go away. But when we ask for the buffer here, that will add to the allocated bytes
+       * as well, so we need to return the same number back to avoid double-counting them.
+       */
+      try {
+        final ArrowBuf drillBuf = BaseAllocator.this.bufferWithoutReservation(nBytes, null);
+
+        if (DEBUG) {
+          historicalLog.recordEvent("allocate() => %s", String.format("DrillBuf[%d]", drillBuf.getId()));
+        }
+        success = true;
+        return drillBuf;
+      } finally {
+        if (!success) {
+          releaseBytes(nBytes);
+        }
+      }
+    }
+
+    /**
+     * Return the reservation back to the allocator without having used it.
+     *
+     * @param nBytes
+     *          the size of the reservation
+     */
+    private void releaseReservation(int nBytes) {
+      assertOpen();
+
+      releaseBytes(nBytes);
+
+      if (DEBUG) {
+        historicalLog.recordEvent("releaseReservation(%d)", nBytes);
+      }
+    }
+
+  }
+
+  @Override
+  public AllocationReservation newReservation() {
+    assertOpen();
+
+    return new Reservation();
+  }
+
+
+  @Override
+  public synchronized void close() {
+    /*
+     * Some owners may close more than once because of complex cleanup and shutdown
+     * procedures.
+     */
+    if (isClosed) {
+      return;
+    }
+
+    isClosed = true;
+
+    if (DEBUG) {
+      synchronized(DEBUG_LOCK) {
+        verifyAllocator();
+
+        // are there outstanding child allocators?
+        if (!childAllocators.isEmpty()) {
+          for (final BaseAllocator childAllocator : childAllocators.keySet()) {
+            if (childAllocator.isClosed) {
+              logger.warn(String.format(
+                  "Closed child allocator[%s] on parent allocator[%s]'s child list.\n%s",
+                  childAllocator.name, name, toString()));
+            }
+          }
+
+          throw new IllegalStateException(
+              String.format("Allocator[%s] closed with outstanding child allocators.\n%s", name, toString()));
+        }
+
+        // are there outstanding buffers?
+        final int allocatedCount = childLedgers.size();
+        if (allocatedCount > 0) {
+          throw new IllegalStateException(
+              String.format("Allocator[%s] closed with outstanding buffers allocated (%d).\n%s",
+                  name, allocatedCount, toString()));
+        }
+
+        if (reservations.size() != 0) {
+          throw new IllegalStateException(
+              String.format("Allocator[%s] closed with outstanding reservations (%d).\n%s", name, reservations.size(),
+                  toString()));
+        }
+
+      }
+    }
+
+    // Is there unaccounted-for outstanding allocation?
+    final long allocated = getAllocatedMemory();
+    if (allocated > 0) {
+      throw new IllegalStateException(
+          String.format("Memory was leaked by query. Memory leaked: (%d)\n%s", allocated, toString()));
+    }
+
+    // we need to release our memory to our parent before we tell it we've closed.
+    super.close();
+
+    // Inform our parent allocator that we've closed
+    if (parentAllocator != null) {
+      parentAllocator.childClosed(this);
+    }
+
+    if (DEBUG) {
+      historicalLog.recordEvent("closed");
+      logger.debug(String.format(
+          "closed allocator[%s].",
+          name));
+    }
+
+
+  }
+
+  public String toString() {
+    final Verbosity verbosity = logger.isTraceEnabled() ? Verbosity.LOG_WITH_STACKTRACE
+        : Verbosity.BASIC;
+    final StringBuilder sb = new StringBuilder();
+    print(sb, 0, verbosity);
+    return sb.toString();
+  }
+
+  /**
+   * Provide a verbose string of the current allocator state. Includes the state of all child allocators, along with
+   * historical logs of each object and including stacktraces.
+   *
+   * @return A Verbose string of current allocator state.
+   */
+  public String toVerboseString() {
+    final StringBuilder sb = new StringBuilder();
+    print(sb, 0, Verbosity.LOG_WITH_STACKTRACE);
+    return sb.toString();
+  }
+
+  private void hist(String noteFormat, Object... args) {
+    historicalLog.recordEvent(noteFormat, args);
+  }
+
+  /**
+   * Rounds up the provided value to the nearest power of two.
+   *
+   * @param val
+   *          An integer value.
+   * @return The closest power of two of that value.
+   */
+  static int nextPowerOfTwo(int val) {
+    int highestBit = Integer.highestOneBit(val);
+    if (highestBit == val) {
+      return val;
+    } else {
+      return highestBit << 1;
+    }
+  }
+
+
+  /**
+   * Verifies the accounting state of the allocator. Only works for DEBUG.
+   *
+   * @throws IllegalStateException
+   *           when any problems are found
+   */
+  void verifyAllocator() {
+    final IdentityHashMap<UnsafeDirectLittleEndian, BaseAllocator> buffersSeen = new IdentityHashMap<>();
+    verifyAllocator(buffersSeen);
+  }
+
+  /**
+   * Verifies the accounting state of the allocator. Only works for DEBUG.
+   *
+   * <p>
+   * This overload is used for recursive calls, allowing for checking that DrillBufs are unique across all allocators
+   * that are checked.
+   * </p>
+   *
+   * @param buffersSeen
+   *          a map of buffers that have already been seen when walking a tree of allocators
+   * @throws IllegalStateException
+   *           when any problems are found
+   */
+  private void verifyAllocator(final IdentityHashMap<UnsafeDirectLittleEndian, BaseAllocator> buffersSeen) {
+    synchronized (DEBUG_LOCK) {
+
+      // The remaining tests can only be performed if we're in debug mode.
+      if (!DEBUG) {
+        return;
+      }
+
+      final long allocated = getAllocatedMemory();
+
+      // verify my direct descendants
+      final Set<BaseAllocator> childSet = childAllocators.keySet();
+      for (final BaseAllocator childAllocator : childSet) {
+        childAllocator.verifyAllocator(buffersSeen);
+      }
+
+      /*
+       * Verify my relationships with my descendants.
+       *
+       * The sum of direct child allocators' owned memory must be <= my allocated memory; my allocated memory also
+       * includes DrillBuf's directly allocated by me.
+       */
+      long childTotal = 0;
+      for (final BaseAllocator childAllocator : childSet) {
+        childTotal += Math.max(childAllocator.getAllocatedMemory(), childAllocator.reservation);
+      }
+      if (childTotal > getAllocatedMemory()) {
+        historicalLog.logHistory(logger);
+        logger.debug("allocator[" + name + "] child event logs BEGIN");
+        for (final BaseAllocator childAllocator : childSet) {
+          childAllocator.historicalLog.logHistory(logger);
+        }
+        logger.debug("allocator[" + name + "] child event logs END");
+        throw new IllegalStateException(
+            "Child allocators own more memory (" + childTotal + ") than their parent (name = "
+                + name + " ) has allocated (" + getAllocatedMemory() + ')');
+      }
+
+      // Furthermore, the amount I've allocated should be that plus buffers I've allocated.
+      long bufferTotal = 0;
+
+      final Set<BufferLedger> ledgerSet = childLedgers.keySet();
+      for (final BufferLedger ledger : ledgerSet) {
+        if (!ledger.isOwningLedger()) {
+          continue;
+        }
+
+        final UnsafeDirectLittleEndian udle = ledger.getUnderlying();
+        /*
+         * Even when shared, DrillBufs are rewrapped, so we should never see the same instance twice.
+         */
+        final BaseAllocator otherOwner = buffersSeen.get(udle);
+        if (otherOwner != null) {
+          throw new IllegalStateException("This allocator's drillBuf already owned by another allocator");
+        }
+        buffersSeen.put(udle, this);
+
+        bufferTotal += udle.capacity();
+      }
+
+      // Preallocated space has to be accounted for
+      final Set<Reservation> reservationSet = reservations.keySet();
+      long reservedTotal = 0;
+      for (final Reservation reservation : reservationSet) {
+        if (!reservation.isUsed()) {
+          reservedTotal += reservation.getSize();
+        }
+      }
+
+      if (bufferTotal + reservedTotal + childTotal != getAllocatedMemory()) {
+        final StringBuilder sb = new StringBuilder();
+        sb.append("allocator[");
+        sb.append(name);
+        sb.append("]\nallocated: ");
+        sb.append(Long.toString(allocated));
+        sb.append(" allocated - (bufferTotal + reservedTotal + childTotal): ");
+        sb.append(Long.toString(allocated - (bufferTotal + reservedTotal + childTotal)));
+        sb.append('\n');
+
+        if (bufferTotal != 0) {
+          sb.append("buffer total: ");
+          sb.append(Long.toString(bufferTotal));
+          sb.append('\n');
+          dumpBuffers(sb, ledgerSet);
+        }
+
+        if (childTotal != 0) {
+          sb.append("child total: ");
+          sb.append(Long.toString(childTotal));
+          sb.append('\n');
+
+          for (final BaseAllocator childAllocator : childSet) {
+            sb.append("child allocator[");
+            sb.append(childAllocator.name);
+            sb.append("] owned ");
+            sb.append(Long.toString(childAllocator.getAllocatedMemory()));
+            sb.append('\n');
+          }
+        }
+
+        if (reservedTotal != 0) {
+          sb.append(String.format("reserved total : %d bytes.", reservedTotal));
+          for (final Reservation reservation : reservationSet) {
+            reservation.historicalLog.buildHistory(sb, 0, true);
+            sb.append('\n');
+          }
+        }
+
+        logger.debug(sb.toString());
+
+        final long allocated2 = getAllocatedMemory();
+
+        if (allocated2 != allocated) {
+          throw new IllegalStateException(String.format(
+              "allocator[%s]: allocated t1 (%d) + allocated t2 (%d). Someone released memory while in verification.",
+              name, allocated, allocated2));
+
+        }
+        throw new IllegalStateException(String.format(
+            "allocator[%s]: buffer space (%d) + prealloc space (%d) + child space (%d) != allocated (%d)",
+            name, bufferTotal, reservedTotal, childTotal, allocated));
+      }
+    }
+  }
+
+  void print(StringBuilder sb, int level, Verbosity verbosity) {
+
+    indent(sb, level)
+        .append("Allocator(")
+        .append(name)
+        .append(") ")
+        .append(reservation)
+        .append('/')
+        .append(getAllocatedMemory())
+        .append('/')
+        .append(getPeakMemoryAllocation())
+        .append('/')
+        .append(getLimit())
+        .append(" (res/actual/peak/limit)")
+        .append('\n');
+
+    if (DEBUG) {
+      indent(sb, level + 1).append(String.format("child allocators: %d\n", childAllocators.size()));
+      for (BaseAllocator child : childAllocators.keySet()) {
+        child.print(sb, level + 2, verbosity);
+      }
+
+      indent(sb, level + 1).append(String.format("ledgers: %d\n", childLedgers.size()));
+      for (BufferLedger ledger : childLedgers.keySet()) {
+        ledger.print(sb, level + 2, verbosity);
+      }
+
+      final Set<Reservation> reservations = this.reservations.keySet();
+      indent(sb, level + 1).append(String.format("reservations: %d\n", reservations.size()));
+      for (final Reservation reservation : reservations) {
+        if (verbosity.includeHistoricalLog) {
+          reservation.historicalLog.buildHistory(sb, level + 3, true);
+        }
+      }
+
+    }
+
+  }
+
+  private void dumpBuffers(final StringBuilder sb, final Set<BufferLedger> ledgerSet) {
+    for (final BufferLedger ledger : ledgerSet) {
+      if (!ledger.isOwningLedger()) {
+        continue;
+      }
+      final UnsafeDirectLittleEndian udle = ledger.getUnderlying();
+      sb.append("UnsafeDirectLittleEndian[dentityHashCode == ");
+      sb.append(Integer.toString(System.identityHashCode(udle)));
+      sb.append("] size ");
+      sb.append(Integer.toString(udle.capacity()));
+      sb.append('\n');
+    }
+  }
+
+
+  public static StringBuilder indent(StringBuilder sb, int indent) {
+    final char[] indentation = new char[indent * 2];
+    Arrays.fill(indentation, ' ');
+    sb.append(indentation);
+    return sb;
+  }
+
+  public static enum Verbosity {
+    BASIC(false, false), // only include basic information
+    LOG(true, false), // include basic
+    LOG_WITH_STACKTRACE(true, true) //
+    ;
+
+    public final boolean includeHistoricalLog;
+    public final boolean includeStackTraces;
+
+    Verbosity(boolean includeHistoricalLog, boolean includeStackTraces) {
+      this.includeHistoricalLog = includeHistoricalLog;
+      this.includeStackTraces = includeStackTraces;
+    }
+  }
+
+  public static boolean isDebug() {
+    return DEBUG;
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/org/apache/arrow/memory/BoundsChecking.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BoundsChecking.java b/java/memory/src/main/java/org/apache/arrow/memory/BoundsChecking.java
new file mode 100644
index 0000000..4e88c73
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BoundsChecking.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.memory;
+
+public class BoundsChecking {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BoundsChecking.class);
+
+  public static final boolean BOUNDS_CHECKING_ENABLED;
+
+  static {
+    boolean isAssertEnabled = false;
+    assert isAssertEnabled = true;
+    BOUNDS_CHECKING_ENABLED = isAssertEnabled
+        || !"true".equals(System.getProperty("drill.enable_unsafe_memory_access"));
+  }
+
+  private BoundsChecking() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java
new file mode 100644
index 0000000..16a6812
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.memory;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ArrowBuf;
+
+/**
+ * Wrapper class to deal with byte buffer allocation. Ensures users only use designated methods.
+ */
+public interface BufferAllocator extends AutoCloseable {
+  /**
+   * Allocate a new or reused buffer of the provided size. Note that the buffer may technically be larger than the
+   * requested size for rounding purposes. However, the buffer's capacity will be set to the configured size.
+   *
+   * @param size
+   *          The size in bytes.
+   * @return a new DrillBuf, or null if the request can't be satisfied
+   * @throws OutOfMemoryException
+   *           if buffer cannot be allocated
+   */
+  public ArrowBuf buffer(int size);
+
+  /**
+   * Allocate a new or reused buffer of the provided size. Note that the buffer may technically be larger than the
+   * requested size for rounding purposes. However, the buffer's capacity will be set to the configured size.
+   *
+   * @param size
+   *          The size in bytes.
+   * @param manager
+   *          A buffer manager to manage reallocation.
+   * @return a new DrillBuf, or null if the request can't be satisfied
+   * @throws OutOfMemoryException
+   *           if buffer cannot be allocated
+   */
+  public ArrowBuf buffer(int size, BufferManager manager);
+
+  /**
+   * Returns the allocator this allocator falls back to when it needs more memory.
+   *
+   * @return the underlying allocator used by this allocator
+   */
+  public ByteBufAllocator getAsByteBufAllocator();
+
+  /**
+   * Create a new child allocator.
+   *
+   * @param name
+   *          the name of the allocator.
+   * @param initReservation
+   *          the initial space reservation (obtained from this allocator)
+   * @param maxAllocation
+   *          maximum amount of space the new allocator can allocate
+   * @return the new allocator, or null if it can't be created
+   */
+  public BufferAllocator newChildAllocator(String name, long initReservation, long maxAllocation);
+
+  /**
+   * Close and release all buffers generated from this buffer pool.
+   *
+   * <p>When assertions are on, complains if there are any outstanding buffers; to avoid
+   * that, release all buffers before the allocator is closed.
+   */
+  @Override
+  public void close();
+
+  /**
+   * Returns the amount of memory currently allocated from this allocator.
+   *
+   * @return the amount of memory currently allocated
+   */
+  public long getAllocatedMemory();
+
+  /**
+   * Set the maximum amount of memory this allocator is allowed to allocate.
+   *
+   * @param newLimit
+   *          The new Limit to apply to allocations
+   */
+  public void setLimit(long newLimit);
+
+  /**
+   * Return the current maximum limit this allocator imposes.
+   *
+   * @return Limit in number of bytes.
+   */
+  public long getLimit();
+
+  /**
+   * Returns the peak amount of memory allocated from this allocator.
+   *
+   * @return the peak amount of memory allocated
+   */
+  public long getPeakMemoryAllocation();
+
+  /**
+   * Create an allocation reservation. A reservation is a way of building up
+   * a request for a buffer whose size is not known in advance. See
+   * {@see AllocationReservation}.
+   *
+   * @return the newly created reservation
+   */
+  public AllocationReservation newReservation();
+
+  /**
+   * Get a reference to the empty buffer associated with this allocator. Empty buffers are special because we don't
+   * worry about them leaking or managing reference counts on them since they don't actually point to any memory.
+   */
+  public ArrowBuf getEmpty();
+
+  /**
+   * Return the name of this allocator. This is a human readable name that can help debugging. Typically provides
+   * coordinates about where this allocator was created
+   */
+  public String getName();
+
+  /**
+   * Return whether or not this allocator (or one if its parents) is over its limits. In the case that an allocator is
+   * over its limit, all consumers of that allocator should aggressively try to addrss the overlimit situation.
+   */
+  public boolean isOverLimit();
+
+  /**
+   * Return a verbose string describing this allocator. If in DEBUG mode, this will also include relevant stacktraces
+   * and historical logs for underlying objects
+   *
+   * @return A very verbose description of the allocator hierarchy.
+   */
+  public String toVerboseString();
+
+  /**
+   * Asserts (using java assertions) that the provided allocator is currently open. If assertions are disabled, this is
+   * a no-op.
+   */
+  public void assertOpen();
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/org/apache/arrow/memory/BufferManager.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferManager.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferManager.java
new file mode 100644
index 0000000..0610ff0
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferManager.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.arrow.memory;
+
+import io.netty.buffer.ArrowBuf;
+
+/**
+ * Manages a list of {@link ArrowBuf}s that can be reallocated as needed. Upon
+ * re-allocation the old buffer will be freed. Managing a list of these buffers
+ * prevents some parts of the system from needing to define a correct location
+ * to place the final call to free them.
+ *
+ * The current uses of these types of buffers are within the pluggable components of Drill.
+ * In UDFs, memory management should not be a concern. We provide access to re-allocatable
+ * DrillBufs to give UDF writers general purpose buffers we can account for. To prevent the need
+ * for UDFs to contain boilerplate to close all of the buffers they request, this list
+ * is tracked at a higher level and all of the buffers are freed once we are sure that
+ * the code depending on them is done executing (currently {@link FragmentContext}
+ * and {@link QueryContext}.
+ */
+public interface BufferManager extends AutoCloseable {
+
+  /**
+   * Replace an old buffer with a new version at least of the provided size. Does not copy data.
+   *
+   * @param old
+   *          Old Buffer that the user is no longer going to use.
+   * @param newSize
+   *          Size of new replacement buffer.
+   * @return
+   */
+  public ArrowBuf replace(ArrowBuf old, int newSize);
+
+  /**
+   * Get a managed buffer of indeterminate size.
+   *
+   * @return A buffer.
+   */
+  public ArrowBuf getManagedBuffer();
+
+  /**
+   * Get a managed buffer of at least a certain size.
+   *
+   * @param size
+   *          The desired size
+   * @return A buffer
+   */
+  public ArrowBuf getManagedBuffer(int size);
+
+  public void close();
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
new file mode 100644
index 0000000..6f120e5
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.memory;
+
+
+/**
+ * Child allocator class. Only slightly different from the {@see RootAllocator},
+ * in that these can't be created directly, but must be obtained from
+ * {@see BufferAllocator#newChildAllocator(AllocatorOwner, long, long, int)}.
+
+ * <p>Child allocators can only be created by the root, or other children, so
+ * this class is package private.</p>
+ */
+class ChildAllocator extends BaseAllocator {
+  /**
+   * Constructor.
+   *
+   * @param parentAllocator parent allocator -- the one creating this child
+   * @param allocatorOwner a handle to the object making the request
+   * @param allocationPolicy the allocation policy to use; the policy for all
+   *   allocators must match for each invocation of a drillbit
+   * @param initReservation initial amount of space to reserve (obtained from the parent)
+   * @param maxAllocation maximum amount of space that can be obtained from this allocator;
+   *   note this includes direct allocations (via {@see BufferAllocator#buffer(int, int)}
+   *   et al) and requests from descendant allocators. Depending on the allocation policy in
+   *   force, even less memory may be available
+   * @param flags one or more of BaseAllocator.F_* flags
+   */
+  ChildAllocator(
+      BaseAllocator parentAllocator,
+      String name,
+      long initReservation,
+      long maxAllocation) {
+    super(parentAllocator, name, initReservation, maxAllocation);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/org/apache/arrow/memory/DrillByteBufAllocator.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/DrillByteBufAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/DrillByteBufAllocator.java
new file mode 100644
index 0000000..23d6448
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/DrillByteBufAllocator.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.memory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.ExpandableByteBuf;
+
+/**
+ * An implementation of ByteBufAllocator that wraps a Drill BufferAllocator. This allows the RPC layer to be accounted
+ * and managed using Drill's BufferAllocator infrastructure. The only thin different from a typical BufferAllocator is
+ * the signature and the fact that this Allocator returns ExpandableByteBufs which enable otherwise non-expandable
+ * DrillBufs to be expandable.
+ */
+public class DrillByteBufAllocator implements ByteBufAllocator {
+
+  private static final int DEFAULT_BUFFER_SIZE = 4096;
+  private static final int DEFAULT_MAX_COMPOSITE_COMPONENTS = 16;
+
+  private final BufferAllocator allocator;
+
+  public DrillByteBufAllocator(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  @Override
+  public ByteBuf buffer() {
+    return buffer(DEFAULT_BUFFER_SIZE);
+  }
+
+  @Override
+  public ByteBuf buffer(int initialCapacity) {
+    return new ExpandableByteBuf(allocator.buffer(initialCapacity), allocator);
+  }
+
+  @Override
+  public ByteBuf buffer(int initialCapacity, int maxCapacity) {
+    return buffer(initialCapacity);
+  }
+
+  @Override
+  public ByteBuf ioBuffer() {
+    return buffer();
+  }
+
+  @Override
+  public ByteBuf ioBuffer(int initialCapacity) {
+    return buffer(initialCapacity);
+  }
+
+  @Override
+  public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
+    return buffer(initialCapacity);
+  }
+
+  @Override
+  public ByteBuf directBuffer() {
+    return buffer();
+  }
+
+  @Override
+  public ByteBuf directBuffer(int initialCapacity) {
+    return allocator.buffer(initialCapacity);
+  }
+
+  @Override
+  public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
+    return buffer(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public CompositeByteBuf compositeBuffer() {
+    return compositeBuffer(DEFAULT_MAX_COMPOSITE_COMPONENTS);
+  }
+
+  @Override
+  public CompositeByteBuf compositeBuffer(int maxNumComponents) {
+    return new CompositeByteBuf(this, true, maxNumComponents);
+  }
+
+  @Override
+  public CompositeByteBuf compositeDirectBuffer() {
+    return compositeBuffer();
+  }
+
+  @Override
+  public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
+    return compositeBuffer(maxNumComponents);
+  }
+
+  @Override
+  public boolean isDirectBufferPooled() {
+    return false;
+  }
+
+  @Override
+  public ByteBuf heapBuffer() {
+    throw fail();
+  }
+
+  @Override
+  public ByteBuf heapBuffer(int initialCapacity) {
+    throw fail();
+  }
+
+  @Override
+  public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
+    throw fail();
+  }
+
+  @Override
+  public CompositeByteBuf compositeHeapBuffer() {
+    throw fail();
+  }
+
+  @Override
+  public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
+    throw fail();
+  }
+
+  private RuntimeException fail() {
+    throw new UnsupportedOperationException("Allocator doesn't support heap-based memory.");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/org/apache/arrow/memory/OutOfMemoryException.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/OutOfMemoryException.java b/java/memory/src/main/java/org/apache/arrow/memory/OutOfMemoryException.java
new file mode 100644
index 0000000..6ba0284
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/OutOfMemoryException.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.memory;
+
+
+public class OutOfMemoryException extends RuntimeException {
+  private static final long serialVersionUID = -6858052345185793382L;
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OutOfMemoryException.class);
+
+  public OutOfMemoryException() {
+    super();
+  }
+
+  public OutOfMemoryException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+
+  public OutOfMemoryException(String message, Throwable cause) {
+    super(message, cause);
+
+  }
+
+  public OutOfMemoryException(String message) {
+    super(message);
+
+  }
+
+  public OutOfMemoryException(Throwable cause) {
+    super(cause);
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/org/apache/arrow/memory/README.md
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/README.md b/java/memory/src/main/java/org/apache/arrow/memory/README.md
new file mode 100644
index 0000000..09e4257
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/README.md
@@ -0,0 +1,121 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+ 
+ http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+# Memory: Allocation, Accounting and Management
+ 
+The memory management package contains all the memory allocation related items that Arrow uses to manage memory.
+
+
+## Key Components
+Memory management can be broken into the following main components:
+
+- Memory chunk allocation and fragmentation management
+  - `PooledByteBufAllocatorL` - A LittleEndian clone of Netty's jemalloc implementation
+  - `UnsafeDirectLittleEndian` - A base level memory access interface
+  - `LargeBuffer` - A buffer backing implementation used when working with data larger than one Netty chunk (default to 16mb)
+- Memory limits & Accounting
+  - `Accountant` - A nestable class of lockfree memory accountors.
+- Application-level memory allocation
+  - `BufferAllocator` - The public interface application users should be leveraging
+  - `BaseAllocator` - The base implementation of memory allocation, contains the meat of our the Arrow allocator implementation
+  - `RootAllocator` - The root allocator. Typically only one created for a JVM
+  - `ChildAllocator` - A child allocator that derives from the root allocator
+- Buffer ownership and transfer capabilities
+  - `AllocationManager` - Responsible for managing the relationship between multiple allocators and a single chunk of memory
+  - `BufferLedger` - Responsible for allowing maintaining the relationship between an `AllocationManager`, a `BufferAllocator` and one or more individual `ArrowBuf`s 
+- Memory access
+  - `ArrowBuf` - The facade for interacting directly with a chunk of memory.
+ 
+
+## Memory Management Overview
+Arrow's memory model is based on the following basic concepts:
+
+ - Memory can be allocated up to some limit. That limit could be a real limit (OS/JVM) or a locally imposed limit.
+ - Allocation operates in two phases: accounting then actual allocation. Allocation could fail at either point.
+ - Allocation failure should be recoverable. In all cases, the Allocator infrastructure should expose memory allocation failures (OS or internal limit-based) as `OutOfMemoryException`s.
+ - Any allocator can reserve memory when created. This memory shall be held such that this allocator will always be able to allocate that amount of memory.
+ - A particular application component should work to use a local allocator to understand local memory usage and better debug memory leaks.
+ - The same physical memory can be shared by multiple allocators and the allocator must provide an accounting paradigm for this purpose.
+
+## Allocator Trees
+
+Arrow provides a tree-based model for memory allocation. The RootAllocator is created first, then all allocators are created as children of that allocator. The RootAllocator is responsible for being the master bookeeper for memory allocations. All other allocators are created as children of this tree. Each allocator can first determine whether it has enough local memory to satisfy a particular request. If not, the allocator can ask its parent for an additional memory allocation.
+
+## Reserving Memory
+
+Arrow provides two different ways to reserve memory:
+
+  - BufferAllocator accounting reservations: 
+      When a new allocator (other than the `RootAllocator`) is initialized, it can set aside memory that it will keep locally for its lifetime. This is memory that will never be released back to its parent allocator until the allocator is closed.
+  - `AllocationReservation` via BufferAllocator.newReservation(): Allows a short-term preallocation strategy so that a particular subsystem can ensure future memory is available to support a particular request.
+  
+## Memory Ownership, Reference Counts and Sharing
+Many BufferAllocators can reference the same piece of memory at the same time. The most common situation for this is in the case of a Broadcast Join: in this situation many downstream operators in the same Arrowbit will receive the same physical memory. Each of these operators will be operating within its own Allocator context. We therefore have multiple allocators all pointing at the same physical memory. It is the AllocationManager's responsibility to ensure that in this situation, that all memory is accurately accounted for from the Root's perspective and also to ensure that the memory is correctly released once all BufferAllocators have stopped using that memory.
+
+For simplicity of accounting, we treat that memory as being used by one of the BufferAllocators associated with the memory. When that allocator releases its claim on that memory, the memory ownership is then moved to another BufferLedger belonging to the same AllocationManager. Note that because a ArrowBuf.release() is what actually causes memory ownership transfer to occur, we always precede with ownership transfer (even if that violates an allocator limit). It is the responsibility of the application owning a particular allocator to frequently confirm whether the allocator is over its memory limit (BufferAllocator.isOverLimit()) and if so, attempt to aggresively release memory to ameliorate the situation.
+
+All ArrowBufs (direct or sliced) related to a single BufferLedger/BufferAllocator combination share the same reference count and either all will be valid or all will be invalid.
+
+## Object Hierarchy
+
+There are two main ways that someone can look at the object hierarchy for Arrow's memory management scheme. The first is a memory based perspective as below:
+
+### Memory Perspective
+<pre>
++ AllocationManager
+|
+|-- UnsignedDirectLittleEndian (One per AllocationManager)
+|
+|-+ BufferLedger 1 ==> Allocator A (owning)
+| ` - ArrowBuf 1
+|-+ BufferLedger 2 ==> Allocator B (non-owning)
+| ` - ArrowBuf 2
+|-+ BufferLedger 3 ==> Allocator C (non-owning)
+  | - ArrowBuf 3
+  | - ArrowBuf 4
+  ` - ArrowBuf 5
+</pre>
+
+In this picture, a piece of memory is owned by an allocator manager. An allocator manager is responsible for that piece of memory no matter which allocator(s) it is working with. An allocator manager will have relationships with a piece of raw memory (via its reference to UnsignedDirectLittleEndian) as well as references to each BufferAllocator it has a relationship to. 
+
+### Allocator Perspective
+<pre>
++ RootAllocator
+|-+ ChildAllocator 1
+| | - ChildAllocator 1.1
+| ` ...
+|
+|-+ ChildAllocator 2
+|-+ ChildAllocator 3
+| |
+| |-+ BufferLedger 1 ==> AllocationManager 1 (owning) ==> UDLE
+| | `- ArrowBuf 1
+| `-+ BufferLedger 2 ==> AllocationManager 2 (non-owning)==> UDLE
+| 	`- ArrowBuf 2
+|
+|-+ BufferLedger 3 ==> AllocationManager 1 (non-owning)==> UDLE
+| ` - ArrowBuf 3
+|-+ BufferLedger 4 ==> AllocationManager 2 (owning) ==> UDLE
+  | - ArrowBuf 4
+  | - ArrowBuf 5
+  ` - ArrowBuf 6
+</pre>
+
+In this picture, a RootAllocator owns three ChildAllocators. The first ChildAllocator (ChildAllocator 1) owns a subsequent ChildAllocator. ChildAllocator has two BufferLedgers/AllocationManager references. Coincidentally, each of these AllocationManager's is also associated with the RootAllocator. In this case, one of the these AllocationManagers is owned by ChildAllocator 3 (AllocationManager 1) while the other AllocationManager (AllocationManager 2) is owned/accounted for by the RootAllocator. Note that in this scenario, ArrowBuf 1 is sharing the underlying memory as ArrowBuf 3. However the subset of that memory (e.g. through slicing) might be different. Also note that ArrowBuf 2 and ArrowBuf 4, 5 and 6 are also sharing the same underlying memory. Also note that ArrowBuf 4, 5 and 6 all share the same reference count and fate.
+
+## Debugging Issues
+The Allocator object provides a useful set of tools to better understand the status of the allocator. If in `DEBUG` mode, the allocator and supporting classes will record additional debug tracking information to better track down memory leaks and issues. To enable DEBUG mode, either enable Java assertions with `-ea` or pass the following system property to the VM when starting `-Darrow.memory.debug.allocator=true`. The BufferAllocator also provides a `BufferAllocator.toVerboseString()` which can be used in DEBUG mode to get extensive stacktrace information and events associated with various Allocator behaviors.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
new file mode 100644
index 0000000..571fc37
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.memory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The root allocator for using direct memory inside a Drillbit. Supports creating a
+ * tree of descendant child allocators.
+ */
+public class RootAllocator extends BaseAllocator {
+
+  public RootAllocator(final long limit) {
+    super(null, "ROOT", 0, limit);
+  }
+
+  /**
+   * Verify the accounting state of the allocation system.
+   */
+  @VisibleForTesting
+  public void verify() {
+    verifyAllocator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/org/apache/arrow/memory/package-info.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/package-info.java b/java/memory/src/main/java/org/apache/arrow/memory/package-info.java
new file mode 100644
index 0000000..712af30
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/package-info.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *  Memory Allocation, Account and Management
+ *
+ *  See the README.md file in this directory for detailed information about Drill's memory allocation subsystem.
+ *
+ */
+package org.apache.arrow.memory;

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/org/apache/arrow/memory/util/AssertionUtil.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/util/AssertionUtil.java b/java/memory/src/main/java/org/apache/arrow/memory/util/AssertionUtil.java
new file mode 100644
index 0000000..28d0785
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/util/AssertionUtil.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.memory.util;
+
+public class AssertionUtil {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AssertionUtil.class);
+
+  public static final boolean ASSERT_ENABLED;
+
+  static{
+    boolean isAssertEnabled = false;
+    assert isAssertEnabled = true;
+    ASSERT_ENABLED = isAssertEnabled;
+  }
+
+  public static boolean isAssertionsEnabled(){
+    return ASSERT_ENABLED;
+  }
+
+  private AssertionUtil() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/memory/src/main/java/org/apache/arrow/memory/util/AutoCloseableLock.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/util/AutoCloseableLock.java b/java/memory/src/main/java/org/apache/arrow/memory/util/AutoCloseableLock.java
new file mode 100644
index 0000000..94e5cc5
--- /dev/null
+++ b/java/memory/src/main/java/org/apache/arrow/memory/util/AutoCloseableLock.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.memory.util;
+
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Simple wrapper class that allows Locks to be released via an try-with-resources block.
+ */
+public class AutoCloseableLock implements AutoCloseable {
+
+  private final Lock lock;
+
+  public AutoCloseableLock(Lock lock) {
+    this.lock = lock;
+  }
+
+  public AutoCloseableLock open() {
+    lock.lock();
+    return this;
+  }
+
+  @Override
+  public void close() {
+    lock.unlock();
+  }
+
+}