You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/12/22 16:06:34 UTC

[11/13] drill git commit: DRILL-4134: Allocator Improvements

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestBaseAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestBaseAllocator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestBaseAllocator.java
deleted file mode 100644
index 6ea5670..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestBaseAllocator.java
+++ /dev/null
@@ -1,651 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import io.netty.buffer.DrillBuf;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.testing.ExecutionControls;
-import org.apache.drill.exec.util.Pointer;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-
-public class TestBaseAllocator {
-  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBaseAllocator.class);
-
-  private final static int MAX_ALLOCATION = 8 * 1024;
-
-/*
-  // ---------------------------------------- DEBUG -----------------------------------
-
-  @After
-  public void checkBuffers() {
-    final int bufferCount = UnsafeDirectLittleEndian.getBufferCount();
-    if (bufferCount != 0) {
-      UnsafeDirectLittleEndian.logBuffers(logger);
-      UnsafeDirectLittleEndian.releaseBuffers();
-    }
-
-    assertEquals(0, bufferCount);
-  }
-
-//  @AfterClass
-//  public static void dumpBuffers() {
-//    UnsafeDirectLittleEndian.logBuffers(logger);
-//  }
-
-  // ---------------------------------------- DEBUG ------------------------------------
-*/
-
-  // Concoct ExecutionControls that won't try to inject anything.
-  @Mock private static final OptionManager optionManager = Mockito.mock(OptionManager.class);
-  static {
-    Mockito.when(optionManager.getOption(Matchers.anyString()))
-      .thenReturn(null);
-  }
-
-  @Mock private static final ExecutionControls executionControls = new ExecutionControls(optionManager, null);
-
-  private final static class NamedOwner implements AllocatorOwner {
-    private final String name;
-
-    public NamedOwner(final String name) {
-      this.name = name;
-    }
-
-    @Override
-    public String toString() {
-      return name;
-    }
-
-    @Override
-    public ExecutionControls getExecutionControls() {
-      return executionControls;
-    }
-
-    @Override
-    public FragmentContext getFragmentContext() {
-      return null;
-    }
-  }
-
-  @Test
-  public void test_privateMax() throws Exception {
-    final AllocatorOwner allocatorOwner = new NamedOwner("noLimits");
-    try(final RootAllocator rootAllocator =
-        new RootAllocator(RootAllocator.POLICY_LOCAL_MAX, 0, MAX_ALLOCATION, 0)) {
-      final DrillBuf drillBuf1 = rootAllocator.buffer(MAX_ALLOCATION / 2);
-      assertNotNull("allocation failed", drillBuf1);
-
-      try(final BufferAllocator childAllocator =
-          rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0)) {
-        final DrillBuf drillBuf2 = childAllocator.buffer(MAX_ALLOCATION / 2);
-        assertNotNull("allocation failed", drillBuf2);
-        drillBuf2.release();
-      }
-
-      drillBuf1.release();
-    }
-  }
-
-  @Test(expected=IllegalStateException.class)
-  public void testRootAllocator_closeWithOutstanding() throws Exception {
-    try {
-      try(final RootAllocator rootAllocator =
-          new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
-        final DrillBuf drillBuf = rootAllocator.buffer(512);
-        assertNotNull("allocation failed", drillBuf);
-      }
-    } finally {
-      /*
-       * We expect there to be one unreleased underlying buffer because we're closing
-       * without releasing it.
-       */
-/*
-      // ------------------------------- DEBUG ---------------------------------
-      final int bufferCount = UnsafeDirectLittleEndian.getBufferCount();
-      UnsafeDirectLittleEndian.releaseBuffers();
-      assertEquals(1, bufferCount);
-      // ------------------------------- DEBUG ---------------------------------
-*/
-    }
-  }
-
-  @Test
-  public void testRootAllocator_getEmpty() throws Exception {
-    try(final RootAllocator rootAllocator =
-        new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
-      final DrillBuf drillBuf = rootAllocator.buffer(0);
-      assertNotNull("allocation failed", drillBuf);
-      assertEquals("capacity was non-zero", 0, drillBuf.capacity());
-      drillBuf.release();
-    }
-  }
-
-  @Ignore // TODO(DRILL-2740)
-  @Test(expected = IllegalStateException.class)
-  public void testAllocator_unreleasedEmpty() throws Exception {
-    try(final RootAllocator rootAllocator =
-        new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
-      @SuppressWarnings("unused")
-      final DrillBuf drillBuf = rootAllocator.buffer(0);
-    }
-  }
-
-  @Test
-  public void testAllocator_transferOwnership() throws Exception {
-    final AllocatorOwner allocatorOwner = new NamedOwner("changeOwnership");
-    try(final RootAllocator rootAllocator =
-        new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
-      final BufferAllocator childAllocator1 =
-          rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
-      final BufferAllocator childAllocator2 =
-          rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
-
-      final DrillBuf drillBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4);
-      rootAllocator.verify();
-      final boolean allocationFit = childAllocator2.takeOwnership(drillBuf1);
-      rootAllocator.verify();
-      assertTrue(allocationFit);
-
-      childAllocator1.close();
-      rootAllocator.verify();
-
-      drillBuf1.release();
-      childAllocator2.close();
-    }
-  }
-
-  @Test
-  public void testAllocator_shareOwnership() throws Exception {
-    final AllocatorOwner allocatorOwner = new NamedOwner("shareOwnership");
-    try(final RootAllocator rootAllocator =
-        new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
-      final BufferAllocator childAllocator1 =
-          rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
-      final BufferAllocator childAllocator2 =
-          rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
-      final DrillBuf drillBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4);
-      rootAllocator.verify();
-      final Pointer<DrillBuf> pDrillBuf = new Pointer<>();
-      boolean allocationFit;
-
-      allocationFit = childAllocator2.shareOwnership(drillBuf1, pDrillBuf);
-      assertTrue(allocationFit);
-      rootAllocator.verify();
-      final DrillBuf drillBuf2 = pDrillBuf.value;
-      assertNotNull(drillBuf2);
-      assertNotEquals(drillBuf2, drillBuf1);
-
-      drillBuf1.release();
-      rootAllocator.verify();
-      childAllocator1.close();
-      rootAllocator.verify();
-
-      final BufferAllocator childAllocator3 =
-          rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
-      allocationFit = childAllocator3.shareOwnership(drillBuf2, pDrillBuf);
-      assertTrue(allocationFit);
-      final DrillBuf drillBuf3 = pDrillBuf.value;
-      assertNotNull(drillBuf3);
-      assertNotEquals(drillBuf3, drillBuf1);
-      assertNotEquals(drillBuf3, drillBuf2);
-      rootAllocator.verify();
-
-      drillBuf2.release();
-      rootAllocator.verify();
-      childAllocator2.close();
-      rootAllocator.verify();
-
-      drillBuf3.release();
-      rootAllocator.verify();
-      childAllocator3.close();
-    }
-  }
-
-  @Test
-  public void testRootAllocator_createChildAndUse() throws Exception {
-    final AllocatorOwner allocatorOwner = new NamedOwner("createChildAndUse");
-    try(final RootAllocator rootAllocator =
-        new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
-      try(final BufferAllocator childAllocator =
-          rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0)) {
-        final DrillBuf drillBuf = childAllocator.buffer(512);
-        assertNotNull("allocation failed", drillBuf);
-        drillBuf.release();
-      }
-    }
-  }
-
-  @Test(expected=IllegalStateException.class)
-  public void testRootAllocator_createChildDontClose() throws Exception {
-    try {
-      final AllocatorOwner allocatorOwner = new NamedOwner("createChildDontClose");
-      try(final RootAllocator rootAllocator =
-          new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
-        final BufferAllocator childAllocator =
-            rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
-        final DrillBuf drillBuf = childAllocator.buffer(512);
-        assertNotNull("allocation failed", drillBuf);
-      }
-    } finally {
-      /*
-       * We expect one underlying buffer because we closed a child allocator without
-       * releasing the buffer allocated from it.
-       */
-/*
-      // ------------------------------- DEBUG ---------------------------------
-      final int bufferCount = UnsafeDirectLittleEndian.getBufferCount();
-      UnsafeDirectLittleEndian.releaseBuffers();
-      assertEquals(1, bufferCount);
-      // ------------------------------- DEBUG ---------------------------------
-*/
-    }
-  }
-
-  private static void allocateAndFree(final BufferAllocator allocator) {
-    final DrillBuf drillBuf = allocator.buffer(512);
-    assertNotNull("allocation failed", drillBuf);
-    drillBuf.release();
-
-    final DrillBuf drillBuf2 = allocator.buffer(MAX_ALLOCATION);
-    assertNotNull("allocation failed", drillBuf2);
-    drillBuf2.release();
-
-    final int nBufs = 8;
-    final DrillBuf[] drillBufs = new DrillBuf[nBufs];
-    for(int i = 0; i < drillBufs.length; ++i) {
-      DrillBuf drillBufi = allocator.buffer(MAX_ALLOCATION / nBufs);
-      assertNotNull("allocation failed", drillBufi);
-      drillBufs[i] = drillBufi;
-    }
-    for(DrillBuf drillBufi : drillBufs) {
-      drillBufi.release();
-    }
-  }
-
-  @Test
-  public void testAllocator_manyAllocations() throws Exception {
-    final AllocatorOwner allocatorOwner = new NamedOwner("manyAllocations");
-    try(final RootAllocator rootAllocator =
-        new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
-      try(final BufferAllocator childAllocator =
-          rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0)) {
-        allocateAndFree(childAllocator);
-      }
-    }
-  }
-
-  @Test
-  public void testAllocator_overAllocate() throws Exception {
-    final AllocatorOwner allocatorOwner = new NamedOwner("overAllocate");
-    try(final RootAllocator rootAllocator =
-        new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
-      try(final BufferAllocator childAllocator =
-          rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0)) {
-        allocateAndFree(childAllocator);
-
-        try {
-          childAllocator.buffer(MAX_ALLOCATION + 1);
-          fail("allocated memory beyond max allowed");
-        } catch(OutOfMemoryRuntimeException e) {
-          // expected
-        }
-      }
-    }
-  }
-
-  @Test
-  public void testAllocator_overAllocateParent() throws Exception {
-    final AllocatorOwner allocatorOwner = new NamedOwner("overAllocateParent");
-    try(final RootAllocator rootAllocator =
-        new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
-      try(final BufferAllocator childAllocator =
-          rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0)) {
-        final DrillBuf drillBuf1 = rootAllocator.buffer(MAX_ALLOCATION / 2);
-        assertNotNull("allocation failed", drillBuf1);
-        final DrillBuf drillBuf2 = childAllocator.buffer(MAX_ALLOCATION / 2);
-        assertNotNull("allocation failed", drillBuf2);
-
-        try {
-          childAllocator.buffer(MAX_ALLOCATION / 4);
-          fail("allocated memory beyond max allowed");
-        } catch(OutOfMemoryRuntimeException e) {
-          // expected
-        }
-
-        drillBuf1.release();
-        drillBuf2.release();
-      }
-    }
-  }
-
-  private static void testAllocator_sliceUpBufferAndRelease(
-      final RootAllocator rootAllocator, final BufferAllocator bufferAllocator) {
-    final DrillBuf drillBuf1 = bufferAllocator.buffer(MAX_ALLOCATION / 2);
-    rootAllocator.verify();
-
-    final DrillBuf drillBuf2 = drillBuf1.slice(16, drillBuf1.capacity() - 32);
-    rootAllocator.verify();
-    final DrillBuf drillBuf3 = drillBuf2.slice(16, drillBuf2.capacity() - 32);
-    rootAllocator.verify();
-    @SuppressWarnings("unused")
-    final DrillBuf drillBuf4 = drillBuf3.slice(16, drillBuf3.capacity() - 32);
-    rootAllocator.verify();
-
-    drillBuf3.release(); // since they share refcounts, one is enough to release them all
-    rootAllocator.verify();
-  }
-
-  @Test
-  public void testAllocator_createSlices() throws Exception {
-    final AllocatorOwner allocatorOwner = new NamedOwner("createSlices");
-    try(final RootAllocator rootAllocator =
-        new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
-      testAllocator_sliceUpBufferAndRelease(rootAllocator, rootAllocator);
-
-      try(final BufferAllocator childAllocator =
-          rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0)) {
-        testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator);
-      }
-      rootAllocator.verify();
-
-      testAllocator_sliceUpBufferAndRelease(rootAllocator, rootAllocator);
-
-      try(final BufferAllocator childAllocator =
-          rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0)) {
-        try(final BufferAllocator childAllocator2 =
-            childAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0)) {
-          final DrillBuf drillBuf1 = childAllocator2.buffer(MAX_ALLOCATION / 8);
-          @SuppressWarnings("unused")
-          final DrillBuf drillBuf2 = drillBuf1.slice(MAX_ALLOCATION / 16, MAX_ALLOCATION / 16);
-          testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator);
-          drillBuf1.release();
-          rootAllocator.verify();
-        }
-        rootAllocator.verify();
-
-        testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator);
-      }
-      rootAllocator.verify();
-    }
-  }
-
-  @Test
-  public void testAllocator_sliceRanges() throws Exception {
-//    final AllocatorOwner allocatorOwner = new NamedOwner("sliceRanges");
-    try(final RootAllocator rootAllocator =
-        new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
-      // Populate a buffer with byte values corresponding to their indices.
-      final DrillBuf drillBuf = rootAllocator.buffer(256, 256 + 256);
-      assertEquals(256, drillBuf.capacity());
-      assertEquals(256 + 256, drillBuf.maxCapacity());
-      assertEquals(0, drillBuf.readerIndex());
-      assertEquals(0, drillBuf.readableBytes());
-      assertEquals(0, drillBuf.writerIndex());
-      assertEquals(256, drillBuf.writableBytes());
-
-      final DrillBuf slice3 = (DrillBuf) drillBuf.slice();
-      assertEquals(0, slice3.readerIndex());
-      assertEquals(0, slice3.readableBytes());
-      assertEquals(0, slice3.writerIndex());
-//      assertEquals(256, slice3.capacity());
-//      assertEquals(256, slice3.writableBytes());
-
-      for(int i = 0; i < 256; ++i) {
-        drillBuf.writeByte(i);
-      }
-      assertEquals(0, drillBuf.readerIndex());
-      assertEquals(256, drillBuf.readableBytes());
-      assertEquals(256, drillBuf.writerIndex());
-      assertEquals(0, drillBuf.writableBytes());
-
-      final DrillBuf slice1 = (DrillBuf) drillBuf.slice();
-      assertEquals(0, slice1.readerIndex());
-      assertEquals(256, slice1.readableBytes());
-      for(int i = 0; i < 10; ++i) {
-        assertEquals(i, slice1.readByte());
-      }
-      assertEquals(256 - 10, slice1.readableBytes());
-      for(int i = 0; i < 256; ++i) {
-        assertEquals((byte) i, slice1.getByte(i));
-      }
-
-      final DrillBuf slice2 = (DrillBuf) drillBuf.slice(25, 25);
-      assertEquals(0, slice2.readerIndex());
-      assertEquals(25, slice2.readableBytes());
-      for(int i = 25; i < 50; ++i) {
-        assertEquals(i, slice2.readByte());
-      }
-
-/*
-      for(int i = 256; i > 0; --i) {
-        slice3.writeByte(i - 1);
-      }
-      for(int i = 0; i < 256; ++i) {
-        assertEquals(255 - i, slice1.getByte(i));
-      }
-*/
-
-      drillBuf.release(); // all the derived buffers share this fate
-    }
-  }
-
-  @Test
-  public void testAllocator_slicesOfSlices() throws Exception {
-//    final AllocatorOwner allocatorOwner = new NamedOwner("slicesOfSlices");
-    try(final RootAllocator rootAllocator =
-        new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
-      // Populate a buffer with byte values corresponding to their indices.
-      final DrillBuf drillBuf = rootAllocator.buffer(256, 256 + 256);
-      for(int i = 0; i < 256; ++i) {
-        drillBuf.writeByte(i);
-      }
-
-      // Slice it up.
-      final DrillBuf slice0 = drillBuf.slice(0, drillBuf.capacity());
-      for(int i = 0; i < 256; ++i) {
-        assertEquals((byte) i, drillBuf.getByte(i));
-      }
-
-      final DrillBuf slice10 = slice0.slice(10, drillBuf.capacity() - 10);
-      for(int i = 10; i < 256; ++i) {
-        assertEquals((byte) i, slice10.getByte(i - 10));
-      }
-
-      final DrillBuf slice20 = slice10.slice(10, drillBuf.capacity() - 20);
-      for(int i = 20; i < 256; ++i) {
-        assertEquals((byte) i, slice20.getByte(i - 20));
-      }
-
-      final DrillBuf slice30 = slice20.slice(10,  drillBuf.capacity() - 30);
-      for(int i = 30; i < 256; ++i) {
-        assertEquals((byte) i, slice30.getByte(i - 30));
-      }
-
-      drillBuf.release();
-    }
-  }
-
-  @Test
-  public void testAllocator_transferSliced() throws Exception {
-    final AllocatorOwner allocatorOwner = new NamedOwner("transferSliced");
-    try(final RootAllocator rootAllocator =
-        new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
-      final BufferAllocator childAllocator1 =
-          rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
-      final BufferAllocator childAllocator2 =
-          rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
-
-      final DrillBuf drillBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8);
-      final DrillBuf drillBuf2 = childAllocator2.buffer(MAX_ALLOCATION / 8);
-
-      final DrillBuf drillBuf1s = drillBuf1.slice(0, drillBuf1.capacity() / 2);
-      final DrillBuf drillBuf2s = drillBuf2.slice(0, drillBuf2.capacity() / 2);
-
-      rootAllocator.verify();
-
-      childAllocator1.takeOwnership(drillBuf2s);
-      rootAllocator.verify();
-      childAllocator2.takeOwnership(drillBuf1s);
-      rootAllocator.verify();
-
-      drillBuf1s.release(); // releases drillBuf1
-      drillBuf2s.release(); // releases drillBuf2
-
-      childAllocator1.close();
-      childAllocator2.close();
-    }
-  }
-
-  @Test
-  public void testAllocator_shareSliced() throws Exception {
-    final AllocatorOwner allocatorOwner = new NamedOwner("transferSliced");
-    try(final RootAllocator rootAllocator =
-        new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
-      final BufferAllocator childAllocator1 =
-          rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
-      final BufferAllocator childAllocator2 =
-          rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
-
-      final DrillBuf drillBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8);
-      final DrillBuf drillBuf2 = childAllocator2.buffer(MAX_ALLOCATION / 8);
-
-      final DrillBuf drillBuf1s = drillBuf1.slice(0, drillBuf1.capacity() / 2);
-      final DrillBuf drillBuf2s = drillBuf2.slice(0, drillBuf2.capacity() / 2);
-
-      rootAllocator.verify();
-
-      final Pointer<DrillBuf> pDrillBuf = new Pointer<>();
-      childAllocator1.shareOwnership(drillBuf2s, pDrillBuf);
-      final DrillBuf drillBuf2s1 = pDrillBuf.value;
-      childAllocator2.shareOwnership(drillBuf1s, pDrillBuf);
-      final DrillBuf drillBuf1s2 = pDrillBuf.value;
-      rootAllocator.verify();
-
-      drillBuf1s.release(); // releases drillBuf1
-      drillBuf2s.release(); // releases drillBuf2
-      rootAllocator.verify();
-
-      drillBuf2s1.release(); // releases the shared drillBuf2 slice
-      drillBuf1s2.release(); // releases the shared drillBuf1 slice
-
-      childAllocator1.close();
-      childAllocator2.close();
-    }
-  }
-
-  @Test
-  public void testAllocator_transferShared() throws Exception {
-    final AllocatorOwner allocatorOwner = new NamedOwner("transferShared");
-    try(final RootAllocator rootAllocator =
-        new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
-      final BufferAllocator childAllocator1 =
-          rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
-      final BufferAllocator childAllocator2 =
-          rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
-      final BufferAllocator childAllocator3 =
-          rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
-
-      final DrillBuf drillBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8);
-
-      final Pointer<DrillBuf> pDrillBuf = new Pointer<>();
-      boolean allocationFit;
-
-      allocationFit = childAllocator2.shareOwnership(drillBuf1, pDrillBuf);
-      assertTrue(allocationFit);
-      rootAllocator.verify();
-      final DrillBuf drillBuf2 = pDrillBuf.value;
-      assertNotNull(drillBuf2);
-      assertNotEquals(drillBuf2, drillBuf1);
-
-      allocationFit = childAllocator3.takeOwnership(drillBuf1);
-      assertTrue(allocationFit);
-      rootAllocator.verify();
-
-      // Since childAllocator3 now has childAllocator1's buffer, 1, can close
-      childAllocator1.close();
-      rootAllocator.verify();
-
-      drillBuf2.release();
-      childAllocator2.close();
-      rootAllocator.verify();
-
-      final BufferAllocator childAllocator4 =
-          rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
-      allocationFit = childAllocator4.takeOwnership(drillBuf1);
-      assertTrue(allocationFit);
-      rootAllocator.verify();
-
-      childAllocator3.close();
-      rootAllocator.verify();
-
-      drillBuf1.release();
-      childAllocator4.close();
-      rootAllocator.verify();
-    }
-  }
-
-  @Test
-  public void testAllocator_unclaimedReservation() throws Exception {
-    final AllocatorOwner allocatorOwner = new NamedOwner("unclaimedReservation");
-    try(final RootAllocator rootAllocator =
-        new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
-      try(final BufferAllocator childAllocator1 =
-            rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0)) {
-        try(final AllocationReservation reservation = childAllocator1.newReservation()) {
-          assertTrue(reservation.add(64));
-        }
-        rootAllocator.verify();
-      }
-    }
-  }
-
-  @Test
-  public void testAllocator_claimedReservation() throws Exception {
-    final AllocatorOwner allocatorOwner = new NamedOwner("claimedReservation");
-    try(final RootAllocator rootAllocator =
-        new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
-      try(final BufferAllocator childAllocator1 =
-            rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0)) {
-        try(final AllocationReservation reservation = childAllocator1.newReservation()) {
-          assertTrue(reservation.add(32));
-          assertTrue(reservation.add(32));
-
-          final DrillBuf drillBuf = reservation.buffer();
-          assertEquals(64, drillBuf.capacity());
-          rootAllocator.verify();
-
-          drillBuf.release();
-          rootAllocator.verify();
-        }
-        rootAllocator.verify();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index 94aa84e..7207bf2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.control.Controller;
@@ -112,7 +113,7 @@ public class TestOptiqPlans extends ExecTest {
         workBus,
         new LocalPStoreProvider(config));
     final QueryContext qc = new QueryContext(UserSession.Builder.newBuilder().setSupportComplexTypes(true).build(),
-        bitContext);
+        bitContext, QueryId.getDefaultInstance());
     final PhysicalPlanReader reader = bitContext.getPlanReader();
     final LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
     final PhysicalPlan pp = new BasicOptimizer(qc, connection).optimize(new BasicOptimizer.BasicOptimizationContext(qc), plan);

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
index 0a9b470..96f2b33 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
@@ -24,9 +24,6 @@ import io.netty.buffer.DrillBuf;
 
 import java.nio.charset.Charset;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.expression.SchemaPath;
@@ -47,9 +44,9 @@ import org.apache.drill.exec.expr.holders.RepeatedVarBinaryHolder;
 import org.apache.drill.exec.expr.holders.UInt1Holder;
 import org.apache.drill.exec.expr.holders.UInt4Holder;
 import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.RootAllocatorFactory;
 import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.BitVector;
@@ -67,6 +64,9 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
 public class TestValueVector extends ExecTest {
   //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestValueVector.class);
 
@@ -156,11 +156,11 @@ public class TestValueVector extends ExecTest {
     final int expectedOffsetSize = 10;
     try {
       vector.allocateNew(expectedAllocationInBytes, 10);
-      assertEquals(expectedOffsetSize, vector.getValueCapacity());
-      assertEquals(expectedAllocationInBytes, vector.getBuffer().capacity());
+      assertTrue(expectedOffsetSize <= vector.getValueCapacity());
+      assertTrue(expectedAllocationInBytes <= vector.getBuffer().capacity());
       vector.reAlloc();
-      assertEquals(expectedOffsetSize * 2, vector.getValueCapacity());
-      assertEquals(expectedAllocationInBytes * 2, vector.getBuffer().capacity());
+      assertTrue(expectedOffsetSize * 2 <= vector.getValueCapacity());
+      assertTrue(expectedAllocationInBytes * 2 <= vector.getBuffer().capacity());
     } finally {
       vector.close();
     }
@@ -666,8 +666,11 @@ the interface to load has changed
       for (int i = 0; i < valueVectors.length; i++) {
         final ValueVector vv = valueVectors[i];
         final int vvCapacity = vv.getValueCapacity();
-        assertEquals(String.format("Incorrect value capacity for %s [%d]", vv.getField(), vvCapacity),
-            initialCapacity, vvCapacity);
+
+        // this can't be equality because Nullables will be allocated using power of two sized buffers (thus need 1025
+        // spots in one vector > power of two is 2048, available capacity will be 2048 => 2047)
+        assertTrue(String.format("Incorrect value capacity for %s [%d]", vv.getField(), vvCapacity),
+            initialCapacity <= vvCapacity);
       }
     } finally {
       AutoCloseables.close(valueVectors);

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
index 107f978..73ed65e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
@@ -18,12 +18,18 @@
 package org.apache.drill.exec.server;
 
 import static org.junit.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.NonStrictExpectations;
+
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.SchemaPath;
@@ -35,22 +41,21 @@ import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
-import org.apache.drill.exec.rpc.data.AckSender;
 import org.apache.drill.exec.rpc.data.DataConnectionManager;
-import org.apache.drill.exec.rpc.data.DataResponseHandler;
 import org.apache.drill.exec.rpc.data.DataServer;
 import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.rpc.data.IncomingDataBatch;
 import org.apache.drill.exec.vector.Float8Vector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
@@ -60,33 +65,59 @@ import org.junit.Test;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.DrillBuf;
-import mockit.Injectable;
-import mockit.NonStrictExpectations;
-
 public class TestBitRpc extends ExecTest {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBitRpc.class);
 
   @Test
-  public void testConnectionBackpressure(@Injectable WorkerBee bee, @Injectable final WorkEventBus workBus, @Injectable final FragmentManager fman, @Injectable final FragmentContext fcon) throws Exception {
+  public void testConnectionBackpressure(@Injectable WorkerBee bee, @Injectable final WorkEventBus workBus) throws Exception {
 
     DrillConfig config1 = DrillConfig.create();
     final BootStrapContext c = new BootStrapContext(config1, ClassPathScanner.fromPrescan(config1));
     DrillConfig config2 = DrillConfig.create();
     BootStrapContext c2 = new BootStrapContext(config2, ClassPathScanner.fromPrescan(config2));
 
+    final FragmentContext fcon = new MockUp<FragmentContext>(){
+      BufferAllocator getAllocator(){
+        return c.getAllocator();
+      }
+    }.getMockInstance();
+
+    final FragmentManager fman = new MockUp<FragmentManager>(){
+      int v = 0;
+
+      @Mock
+      boolean handle(IncomingDataBatch batch) throws FragmentSetupException, IOException {
+        try {
+          v++;
+          if (v % 10 == 0) {
+            System.out.println("sleeping.");
+            Thread.sleep(3000);
+          }
+        } catch (InterruptedException e) {
+
+        }
+        RawFragmentBatch rfb = batch.newRawFragmentBatch(c.getAllocator());
+        rfb.sendOk();
+        rfb.release();
+
+        return true;
+      }
+
+      public FragmentContext getFragmentContext(){
+        return fcon;
+      }
+
+    }.getMockInstance();
+
+
     new NonStrictExpectations() {{
       workBus.getFragmentManagerIfExists((FragmentHandle) any); result = fman;
       workBus.getFragmentManager( (FragmentHandle) any); result = fman;
-      fman.getFragmentContext(); result = fcon;
-      fcon.getAllocator(); result = c.getAllocator();
     }};
 
     int port = 1234;
 
-    DataResponseHandler drp = new BitComTestHandler();
-    DataServer server = new DataServer(c, workBus, drp);
+    DataServer server = new DataServer(c, c.getAllocator(), workBus, null);
 
     port = server.bind(port, true);
     DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build();
@@ -154,31 +185,4 @@ public class TestBitRpc extends ExecTest {
     }
   }
 
-  private class BitComTestHandler implements DataResponseHandler {
-
-    int v = 0;
-
-    @Override
-    public void informOutOfMemory() {
-    }
-
-    @Override
-    public void handle(FragmentManager manager, FragmentRecordBatch fragmentBatch, DrillBuf data, AckSender sender)
-        throws FragmentSetupException, IOException {
-      // System.out.println("Received.");
-      try {
-        v++;
-        if (v % 10 == 0) {
-          System.out.println("sleeping.");
-          Thread.sleep(3000);
-        }
-      } catch (InterruptedException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
-      sender.sendOk();
-    }
-
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
index c911f79..44cc3a7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
@@ -17,20 +17,21 @@
  */
 package org.apache.drill.exec.testing;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.CountDownLatch;
+
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.proto.UserProtos.UserProperties;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.util.Pointer;
 import org.junit.Test;
 
-import java.util.concurrent.CountDownLatch;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 public class TestCountDownLatchInjection extends BaseTestQuery {
 
   private static final UserSession session = UserSession.Builder.newBuilder()
@@ -132,7 +133,7 @@ public class TestCountDownLatchInjection extends BaseTestQuery {
 
     ControlsInjectionUtil.setControls(session, controls);
 
-    final QueryContext queryContext = new QueryContext(session, bits[0].getContext());
+    final QueryContext queryContext = new QueryContext(session, bits[0].getContext(), QueryId.getDefaultInstance());
 
     final DummyClass dummyClass = new DummyClass(queryContext, trigger, threads);
     (new ThreadCreator(dummyClass, trigger, threads, countingDownTime)).start();

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
index 40620c2..84a7320 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
@@ -17,12 +17,18 @@
  */
 package org.apache.drill.exec.testing;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ZookeeperHelper;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserProtos.UserProperties;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.Drillbit;
@@ -30,11 +36,6 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.junit.Test;
 
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
 public class TestExceptionInjection extends BaseTestQuery {
   private static final String NO_THROW_FAIL = "Didn't throw expected exception";
 
@@ -135,7 +136,7 @@ public class TestExceptionInjection extends BaseTestQuery {
       + "}]}";
     ControlsInjectionUtil.setControls(session, jsonString);
 
-    final QueryContext context = new QueryContext(session, bits[0].getContext());
+    final QueryContext context = new QueryContext(session, bits[0].getContext(), QueryId.getDefaultInstance());
 
     // test that the exception gets thrown
     final DummyClass dummyClass = new DummyClass(context);
@@ -156,7 +157,7 @@ public class TestExceptionInjection extends BaseTestQuery {
       .build();
     ControlsInjectionUtil.setControls(session, controls);
 
-    final QueryContext context = new QueryContext(session, bits[0].getContext());
+    final QueryContext context = new QueryContext(session, bits[0].getContext(), QueryId.getDefaultInstance());
 
     // test that the expected exception (checked) gets thrown
     final DummyClass dummyClass = new DummyClass(context);
@@ -185,7 +186,7 @@ public class TestExceptionInjection extends BaseTestQuery {
       .build();
     ControlsInjectionUtil.setControls(session, controls);
 
-    final QueryContext context = new QueryContext(session, bits[0].getContext());
+    final QueryContext context = new QueryContext(session, bits[0].getContext(), QueryId.getDefaultInstance());
 
     final DummyClass dummyClass = new DummyClass(context);
 
@@ -246,7 +247,7 @@ public class TestExceptionInjection extends BaseTestQuery {
     ControlsInjectionUtil.setControls(session, controls);
 
     {
-      final QueryContext queryContext1 = new QueryContext(session, drillbitContext1);
+      final QueryContext queryContext1 = new QueryContext(session, drillbitContext1, QueryId.getDefaultInstance());
       final DummyClass class1 = new DummyClass(queryContext1);
 
       // these shouldn't throw
@@ -268,7 +269,7 @@ public class TestExceptionInjection extends BaseTestQuery {
       }
     }
     {
-      final QueryContext queryContext2 = new QueryContext(session, drillbitContext2);
+      final QueryContext queryContext2 = new QueryContext(session, drillbitContext2, QueryId.getDefaultInstance());
       final DummyClass class2 = new DummyClass(queryContext2);
 
       // these shouldn't throw

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
index f07f676..54f851a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
@@ -17,6 +17,11 @@
  */
 package org.apache.drill.exec.testing;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.CountDownLatch;
+
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.common.config.DrillConfig;
@@ -24,6 +29,7 @@ import org.apache.drill.exec.ZookeeperHelper;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.proto.UserProtos.UserProperties;
 import org.apache.drill.exec.rpc.user.UserSession;
@@ -34,11 +40,6 @@ import org.apache.drill.exec.util.Pointer;
 import org.junit.Test;
 import org.slf4j.Logger;
 
-import java.util.concurrent.CountDownLatch;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 public class TestPauseInjection extends BaseTestQuery {
 
   private static final UserSession session = UserSession.Builder.newBuilder()
@@ -126,7 +127,7 @@ public class TestPauseInjection extends BaseTestQuery {
 
     ControlsInjectionUtil.setControls(session, controls);
 
-    final QueryContext queryContext = new QueryContext(session, bits[0].getContext());
+    final QueryContext queryContext = new QueryContext(session, bits[0].getContext(), QueryId.getDefaultInstance());
 
     (new ResumingThread(queryContext, trigger, ex, expectedDuration)).start();
 
@@ -181,7 +182,7 @@ public class TestPauseInjection extends BaseTestQuery {
       final long expectedDuration = 1000L;
       final ExtendedLatch trigger = new ExtendedLatch(1);
       final Pointer<Exception> ex = new Pointer<>();
-      final QueryContext queryContext = new QueryContext(session, drillbitContext1);
+      final QueryContext queryContext = new QueryContext(session, drillbitContext1, QueryId.getDefaultInstance());
       (new ResumingThread(queryContext, trigger, ex, expectedDuration)).start();
 
       // test that the pause happens
@@ -199,7 +200,7 @@ public class TestPauseInjection extends BaseTestQuery {
 
     {
       final ExtendedLatch trigger = new ExtendedLatch(1);
-      final QueryContext queryContext = new QueryContext(session, drillbitContext2);
+      final QueryContext queryContext = new QueryContext(session, drillbitContext2, QueryId.getDefaultInstance());
 
       // if the resume did not happen, the test would hang
       final DummyClass dummyClass = new DummyClass(queryContext, trigger);

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
index 7ab2da2..d2f2590 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
@@ -18,11 +18,13 @@
 package org.apache.drill.exec.testing;
 
 import static org.junit.Assert.fail;
+import io.netty.buffer.DrillBuf;
 
-import com.google.common.base.Charsets;
-import com.google.common.io.Resources;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Properties;
 
-import io.netty.buffer.DrillBuf;
+import javax.inject.Inject;
 
 import org.apache.drill.QueryTestUtil;
 import org.apache.drill.common.config.DrillConfig;
@@ -42,16 +44,12 @@ import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.test.DrillTest;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import javax.inject.Inject;
-
-import java.io.IOException;
-import java.net.URL;
-import java.util.Properties;
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
 
 /*
  * TODO(DRILL-3170)
@@ -138,7 +136,7 @@ public class TestResourceLeak extends DrillTest {
 
     @Override
     public void eval() {
-      buf.getAllocator().buffer(1);
+      buf.retain();
       out.value = in.value;
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestPromotableWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestPromotableWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestPromotableWriter.java
index 60a2268..223f4ed 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestPromotableWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestPromotableWriter.java
@@ -20,8 +20,6 @@ package org.apache.drill.exec.vector.complex.writer;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.RootAllocatorFactory;
-import org.apache.drill.exec.memory.TopLevelAllocator;
-import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.TestOutputMutator;
 import org.apache.drill.exec.util.BatchPrinter;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/pom.xml
----------------------------------------------------------------------
diff --git a/exec/memory/base/pom.xml b/exec/memory/base/pom.xml
index adec763..686a12b 100644
--- a/exec/memory/base/pom.xml
+++ b/exec/memory/base/pom.xml
@@ -28,7 +28,17 @@
       <version>3.0.1</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
 
+    <dependency>
+      <groupId>com.carrotsearch</groupId>
+      <artifactId>hppc</artifactId>
+      <version>0.5.2</version>
+    </dependency>
   </dependencies>
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
index d244b26..138495c 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
@@ -27,23 +27,16 @@ import java.nio.ByteOrder;
 import java.nio.channels.GatheringByteChannel;
 import java.nio.channels.ScatteringByteChannel;
 import java.nio.charset.Charset;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.IdentityHashMap;
-import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.drill.common.HistoricalLog;
-import org.apache.drill.exec.memory.Accountor;
+import org.apache.drill.exec.memory.AllocatorManager.BufferLedger;
 import org.apache.drill.exec.memory.BaseAllocator;
+import org.apache.drill.exec.memory.BaseAllocator.Verbosity;
+import org.apache.drill.exec.memory.BoundsChecking;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.BufferManager;
-import org.apache.drill.exec.memory.BufferLedger;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.util.AssertionUtil;
-import org.apache.drill.exec.util.Pointer;
-import org.slf4j.Logger;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
@@ -51,343 +44,46 @@ import com.google.common.base.Preconditions;
 public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillBuf.class);
 
-  private static final boolean BOUNDS_CHECKING_ENABLED = AssertionUtil.BOUNDS_CHECKING_ENABLED;
-  private static final boolean DEBUG = BaseAllocator.isDebug();
-  private static final AtomicInteger idGenerator = new AtomicInteger(0);
+  private static final AtomicLong idGenerator = new AtomicLong(0);
 
-  private final ByteBuf byteBuf;
+  private final long id = idGenerator.incrementAndGet();
+  private final AtomicInteger refCnt;
+  private final UnsafeDirectLittleEndian byteBuf;
   private final long addr;
   private final int offset;
-  private final int flags;
-  private final AtomicInteger rootRefCnt;
-  private volatile BufferAllocator allocator;
-
-  // TODO - cleanup
-  // The code is partly shared and partly copy-pasted between
-  // these three types. They should be unified under one interface
-  // to share code and to remove the hacky code here to use only
-  // one of these types at a time and use null checks to find out
-  // which.
-  private final boolean oldWorld; // Indicates that we're operating with TopLevelAllocator.
-  private final boolean rootBuffer;
-  private volatile Accountor acct;
-  private BufferManager bufManager;
-  @Deprecated private OperatorContext operatorContext;
-  @Deprecated private FragmentContext fragmentContext;
-
-  private volatile BufferLedger bufferLedger;
-  private volatile int length; // TODO this just seems to duplicate .capacity()
-
-  // members used purely for debugging
-  // TODO once we have a reduced number of constructors, move these to DEBUG clauses in them
-  private final int id = idGenerator.incrementAndGet();
-  private final HistoricalLog historicalLog = DEBUG ? new HistoricalLog(4, "DrillBuf[%d]", id) : null;
-  private final static IdentityHashMap<UnsafeDirectLittleEndian, Collection<DrillBuf>> unwrappedMap =
-      DEBUG ? new IdentityHashMap<UnsafeDirectLittleEndian, Collection<DrillBuf>>() : null;
-
-  // TODO(cwestin) javadoc
-  private void unwrappedPut() {
-    final UnsafeDirectLittleEndian udle = (UnsafeDirectLittleEndian) byteBuf;
-    synchronized(unwrappedMap) {
-      Collection<DrillBuf> drillBufs = unwrappedMap.get(udle);
-      if (drillBufs == null) {
-        drillBufs = new LinkedList<DrillBuf>();
-        unwrappedMap.put(udle, drillBufs);
-      }
-
-      drillBufs.add(this);
-    }
-  }
-
-  // TODO(cwestin) javadoc
-  public static Collection<DrillBuf> unwrappedGet(final UnsafeDirectLittleEndian udle) {
-    synchronized(unwrappedMap) {
-      final Collection<DrillBuf> drillBufs = unwrappedMap.get(udle);
-      if (drillBufs == null) {
-        return Collections.emptyList();
-      }
-      return new LinkedList<DrillBuf>(drillBufs);
-    }
-  }
-
-  // TODO(cwestin) javadoc
-  private static boolean unwrappedRemove(final DrillBuf drillBuf) {
-    final ByteBuf byteBuf = drillBuf.unwrap();
-    if (!(byteBuf instanceof UnsafeDirectLittleEndian)) {
-      return false;
-    }
-
-    final UnsafeDirectLittleEndian udle = (UnsafeDirectLittleEndian) byteBuf;
-    synchronized(unwrappedMap) {
-      Collection<DrillBuf> drillBufs = unwrappedMap.get(udle);
-      if (drillBufs == null) {
-        return false;
-      }
-      final Object object = drillBufs.remove(drillBuf);
-      if (drillBufs.isEmpty()) {
-        unwrappedMap.remove(udle);
-      }
-      return object != null;
-    }
-  }
-
-  public DrillBuf(BufferAllocator allocator, Accountor a, UnsafeDirectLittleEndian b) {
-    super(b.maxCapacity());
-    this.byteBuf = b;
-    this.addr = b.memoryAddress();
-    this.acct = a;
-    this.length = b.capacity();
-    this.offset = 0;
-    this.rootBuffer = true;
-    this.allocator = allocator;
-
-    // members from the new world order
-    flags = 0;
-    rootRefCnt = null;
-    oldWorld = true;
-  }
-
-  // TODO(cwestin) javadoc
-  public DrillBuf(final BufferLedger bufferLedger, final BufferAllocator bufferAllocator,
-      final UnsafeDirectLittleEndian byteBuf) {
+  private final BufferLedger ledger;
+  private final BufferManager bufManager;
+  private final ByteBufAllocator alloc;
+  private final boolean isEmpty;
+  private volatile int length;
+
+  private final HistoricalLog historicalLog = BaseAllocator.DEBUG ?
+      new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, "DrillBuf[%d]", id) : null;
+
+  public DrillBuf(
+      final AtomicInteger refCnt,
+      final BufferLedger ledger,
+      final UnsafeDirectLittleEndian byteBuf,
+      final BufferManager manager,
+      final ByteBufAllocator alloc,
+      final int offset,
+      final int length,
+      boolean isEmpty) {
     super(byteBuf.maxCapacity());
+    this.refCnt = refCnt;
     this.byteBuf = byteBuf;
-    byteBuf.retain(1);
-    this.bufferLedger = bufferLedger;
-    addr = byteBuf.memoryAddress();
-    allocator = bufferAllocator;
-    length = byteBuf.capacity();
-    offset = 0;
-    flags = 0;
-    rootRefCnt = new AtomicInteger(1);
-    oldWorld = false;
-
-    // members from the old world order
-    rootBuffer = false;
-    acct = null;
-
-    if (DEBUG) {
-      unwrappedPut();
-      historicalLog.recordEvent(
-          "DrillBuf(BufferLedger, BufferAllocator[%d], UnsafeDirectLittleEndian[identityHashCode == "
-              + "%d](%s)) => rootRefCnt identityHashCode == %d",
-              bufferAllocator.getId(), System.identityHashCode(byteBuf), byteBuf.toString(),
-              System.identityHashCode(rootRefCnt));
-    }
-  }
-
-  private DrillBuf(BufferAllocator allocator, Accountor a) {
-    super(0);
-    this.byteBuf = new EmptyByteBuf(allocator.getUnderlyingAllocator()).order(ByteOrder.LITTLE_ENDIAN);
-    this.allocator = allocator;
-    this.acct = a;
-    this.length = 0;
-    this.addr = 0;
-    this.rootBuffer = false;
-    this.offset = 0;
-
-    // members from the new world order
-    flags = 0;
-    rootRefCnt = null;
-    oldWorld = true;
-  }
-
-  private DrillBuf(final BufferLedger bufferLedger, final BufferAllocator bufferAllocator) {
-    super(0);
-    this.bufferLedger = bufferLedger;
-    allocator = bufferAllocator;
-
-    byteBuf = new EmptyByteBuf(bufferLedger.getUnderlyingAllocator()).order(ByteOrder.LITTLE_ENDIAN);
-    length = 0;
-    addr = 0;
-    flags = 0;
-    rootRefCnt = new AtomicInteger(1);
-    offset = 0;
-
-    // members from the old world order
-    rootBuffer = false;
-    acct = null;
-    oldWorld = false;
-
-    if (DEBUG) {
-      // We don't put the empty buffers in the unwrappedMap.
-      historicalLog.recordEvent(
-          "DrillBuf(BufferLedger, BufferAllocator[%d])  => rootRefCnt identityHashCode == %d",
-          bufferAllocator.getId(), System.identityHashCode(rootRefCnt));
-    }
-  }
-
-  /**
-   * Special constructor used for RPC ownership transfer.  Takes a snapshot slice of the current buf
-   *  but points directly to the underlying UnsafeLittleEndian buffer.  Does this by calling unwrap()
-   *  twice on the provided DrillBuf and expecting an UnsafeDirectLittleEndian buffer. This operation
-   *  includes taking a new reference count on the underlying buffer and maintaining returning with a
-   *  current reference count for itself (masking the underlying reference count).
-   * @param allocator
-   * @param a Allocator used when users try to receive allocator from buffer.
-   * @param b Accountor used for accounting purposes.
-   */
-  public DrillBuf(BufferAllocator allocator, Accountor a, DrillBuf b) {
-    this(allocator, a, getUnderlying(b), b, 0, b.length, true);
-    assert b.unwrap().unwrap() instanceof UnsafeDirectLittleEndian;
-    b.unwrap().unwrap().retain();
-  }
-
-  private DrillBuf(DrillBuf buffer, int index, int length) {
-    this(buffer.allocator, null, buffer, buffer, index, length, false);
-  }
-
-  private static ByteBuf getUnderlying(DrillBuf b){
-    ByteBuf underlying = b.unwrap().unwrap();
-    return underlying.slice((int) (b.memoryAddress() - underlying.memoryAddress()), b.length);
-  }
-
-  private DrillBuf(BufferAllocator allocator, Accountor a, ByteBuf replacement, DrillBuf buffer, int index, int length, boolean root) {
-    super(length);
-    if (index < 0 || index > buffer.capacity() - length) {
-      throw new IndexOutOfBoundsException(buffer.toString() + ".slice(" + index + ", " + length + ')');
-    }
-
-    this.length = length;
-    writerIndex(length);
-
-    this.byteBuf = replacement;
-    this.addr = buffer.memoryAddress() + index;
-    this.offset = index;
-    this.acct = a;
+    this.isEmpty = isEmpty;
+    this.bufManager = manager;
+    this.alloc = alloc;
+    this.addr = byteBuf.memoryAddress() + offset;
+    this.ledger = ledger;
     this.length = length;
-    this.rootBuffer = root;
-    this.allocator = allocator;
+    this.offset = offset;
 
-    // members from the new world order
-    flags = 0;
-    rootRefCnt = null;
-    oldWorld = true;
-  }
-
-  /**
-   * Indicate a shared refcount, as per http://netty.io/wiki/reference-counted-objects.html#wiki-h3-5
-   */
-  private final static int F_DERIVED = 0x0002;
-
-  // TODO(cwestin) javadoc
-  /**
-   * Used for sharing.
-   *
-   * @param bufferLedger
-   * @param bufferAllocator
-   * @param originalBuf
-   * @param index
-   * @param length
-   * @param flags
-   */
-  public DrillBuf(final BufferLedger bufferLedger, final BufferAllocator bufferAllocator,
-      final DrillBuf originalBuf, final int index, final int length, final int flags) {
-    this(bufferAllocator, bufferLedger, getUnderlyingUdle(originalBuf),
-        originalBuf, index + originalBuf.offset, length, flags);
-  }
-
-  /**
-   * Unwraps a DrillBuf until the underlying UnsafeDirectLittleEndian buffer is
-   * found.
-   *
-   * @param originalBuf the original DrillBuf
-   * @return the underlying UnsafeDirectLittleEndian ByteBuf
-   */
-  private static ByteBuf getUnderlyingUdle(final DrillBuf originalBuf) {
-    int count = 1;
-    ByteBuf unwrapped = originalBuf.unwrap();
-    while(!(unwrapped instanceof UnsafeDirectLittleEndian)
-        && (!(unwrapped instanceof EmptyByteBuf))) {
-      unwrapped = unwrapped.unwrap();
-      ++count;
-    }
-
-    if (DEBUG) {
-      if (count > 1) {
-        throw new IllegalStateException("UnsafeDirectLittleEndian is wrapped more than one level");
-      }
+    if (BaseAllocator.DEBUG) {
+      historicalLog.recordEvent("create()");
     }
 
-    return unwrapped;
-  }
-
-  // TODO(cwestin) javadoc
-  /*
-   * TODO the replacement argument becomes an UnsafeDirectLittleEndian;
-   * buffer argument may go away if it is determined to be unnecessary after all
-   * the deprecated stuff is removed (I suspect only the replacement argument is
-   * necessary then).
-   */
-  private DrillBuf(BufferAllocator allocator, BufferLedger bufferLedger,
-      ByteBuf replacement, DrillBuf buffer, int index, int length, int flags) {
-    super(replacement.maxCapacity());
-
-    // members from the old world order
-    rootBuffer = false;
-    acct = null;
-    oldWorld = false;
-
-    if (index < 0 || index > (replacement.maxCapacity() - length)) {
-      throw new IndexOutOfBoundsException(replacement.toString() + ".slice(" + index + ", " + length + ')');
-    }
-
-    this.flags = flags;
-
-    this.length = length; // capacity()
-    writerIndex(length);
-
-    byteBuf = replacement;
-    if ((flags & F_DERIVED) == 0) {
-      replacement.retain(1);
-    }
-
-    addr = replacement.memoryAddress() + index;
-    offset = index;
-    this.bufferLedger = bufferLedger;
-    if (!(buffer instanceof DrillBuf)) {
-      throw new IllegalArgumentException("DrillBuf slicing can only be performed on other DrillBufs");
-    }
-
-    if ((flags & F_DERIVED) != 0) {
-      final DrillBuf rootBuf = (DrillBuf) buffer;
-      rootRefCnt = rootBuf.rootRefCnt;
-    } else {
-      rootRefCnt = new AtomicInteger(1);
-    }
-
-    this.allocator = allocator;
-
-    if (DEBUG) {
-      unwrappedPut();
-      historicalLog.recordEvent(
-          "DrillBuf(BufferAllocator[%d], BufferLedger, ByteBuf[identityHashCode == "
-              + "%d](%s), DrillBuf[%d], index = %d, length = %d, flags = 0x%08x)"
-              + " => rootRefCnt identityHashCode == %d",
-          allocator.getId(), System.identityHashCode(replacement), replacement.toString(),
-          buffer.id, index, length, flags, System.identityHashCode(rootRefCnt));
-    }
-  }
-
-  @Deprecated
-  public void setOperatorContext(OperatorContext c) {
-    this.operatorContext = c;
-  }
-
-  @Deprecated
-  public void setFragmentContext(FragmentContext c) {
-    this.fragmentContext = c;
-  }
-
-  // TODO(DRILL-3331)
-  public void setBufferManager(BufferManager bufManager) {
-    Preconditions.checkState(this.bufManager == null,
-        "the BufferManager for a buffer can only be set once");
-    this.bufManager = bufManager;
-  }
-
-  public BufferAllocator getAllocator() {
-    return allocator;
   }
 
   public DrillBuf reallocIfNeeded(final int size) {
@@ -397,11 +93,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
       return this;
     }
 
-    if (operatorContext != null) {
-      return operatorContext.replace(this, size);
-    } else if(fragmentContext != null) {
-      return fragmentContext.replace(this, size);
-    } else if (bufManager != null) {
+    if (bufManager != null) {
       return bufManager.replace(this, size);
     } else {
       throw new UnsupportedOperationException("Realloc is only available in the context of an operator's UDFs");
@@ -410,15 +102,11 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   @Override
   public int refCnt() {
-    if (oldWorld) {
-      if(rootBuffer){
-        return (int) this.rootRefCnt.get();
-      }else{
-        return byteBuf.refCnt();
-      }
+    if (isEmpty) {
+      return 1;
+    } else {
+      return refCnt.get();
     }
-
-    return rootRefCnt.get();
   }
 
   private long addr(int index) {
@@ -431,7 +119,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
       throw new IllegalArgumentException("length: " + fieldLength + " (expected: >= 0)");
     }
     if (index < 0 || index > capacity() - fieldLength) {
-      if (DEBUG) {
+      if (BaseAllocator.DEBUG) {
         historicalLog.logHistory(logger);
       }
       throw new IndexOutOfBoundsException(String.format(
@@ -449,70 +137,106 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
    * @param end The exclusive endpoint of the bytes to be read.
    */
   public void checkBytes(int start, int end) {
-    if (BOUNDS_CHECKING_ENABLED) {
+    if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
       checkIndexD(start, end - start);
     }
   }
 
   private void chk(int index, int width) {
-    if (BOUNDS_CHECKING_ENABLED) {
+    if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
       checkIndexD(index, width);
     }
   }
 
   private void ensure(int width) {
-    if (BOUNDS_CHECKING_ENABLED) {
+    if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
       ensureWritable(width);
     }
   }
 
   /**
-   * Used by allocators to transfer ownership from one allocator to another.
+   * Create a new DrillBuf that is associated with an alternative allocator for the purposes of memory ownership and
+   * accounting. This has no impact on the reference counting for the current DrillBuf except in the situation where the
+   * passed in Allocator is the same as the current buffer.
+   *
+   * This operation has no impact on the reference count of this DrillBuf. The newly created DrillBuf with either have a
+   * reference count of 1 (in the case that this is the first time this memory is being associated with the new
+   * allocator) or the current value of the reference count + 1 for the other AllocatorManager/BufferLedger combination
+   * in the case that the provided allocator already had an association to this underlying memory.
    *
-   * @param newLedger the new ledger the buffer should use going forward
-   * @param newAllocator the new allocator
-   * @return whether or not the buffer fits the receiving allocator's allocation limit
+   * @param allocator
+   *          The target allocator to create an association with.
+   * @return A new DrillBuf which shares the same underlying memory as this DrillBuf.
    */
-  public boolean transferTo(final BufferAllocator newAllocator, final BufferLedger newLedger) {
-    final Pointer<BufferLedger> pNewLedger = new Pointer<>(newLedger);
-    final boolean fitsAllocation = bufferLedger.transferTo(newAllocator, pNewLedger, this);
-    allocator = newAllocator;
-    bufferLedger = pNewLedger.value;
-    return fitsAllocation;
+  public DrillBuf retain(BufferAllocator allocator) {
+
+    if (isEmpty) {
+      return this;
+    }
+
+    if (BaseAllocator.DEBUG) {
+      historicalLog.recordEvent("retain(%s)", allocator.getName());
+    }
+    BufferLedger otherLedger = this.ledger.getLedgerForAllocator(allocator);
+    return otherLedger.newDrillBuf(offset, length, null, true);
   }
 
   /**
-   * DrillBuf's implementation of sharing buffer functionality, to be accessed from
-   * {@link BufferAllocator#shareOwnership(DrillBuf, Pointer)}. See that function
-   * for more information.
+   * Transfer the memory accounting ownership of this DrillBuf to another allocator. This will generate a new DrillBuf
+   * that carries an association with the underlying memory of this DrillBuf. If this DrillBuf is connected to the
+   * owning BufferLedger of this memory, that memory ownership/accounting will be transferred to the taret allocator. If
+   * this DrillBuf does not currently own the memory underlying it (and is only associated with it), this does not
+   * transfer any ownership to the newly created DrillBuf.
+   *
+   * This operation has no impact on the reference count of this DrillBuf. The newly created DrillBuf with either have a
+   * reference count of 1 (in the case that this is the first time this memory is being associated with the new
+   * allocator) or the current value of the reference count for the other AllocatorManager/BufferLedger combination in
+   * the case that the provided allocator already had an association to this underlying memory.
    *
-   * @param otherLedger the ledger belonging to the other allocator to share with
-   * @param otherAllocator the other allocator to be shared with
-   * @param index the starting index (for slicing capability)
-   * @param length the length (for slicing capability)
-   * @return the new DrillBuf (wrapper)
+   * Transfers will always succeed, even if that puts the other allocator into an overlimit situation. This is possible
+   * due to the fact that the original owning allocator may have allocated this memory out of a local reservation
+   * whereas the target allocator may need to allocate new memory from a parent or RootAllocator. This operation is done
+   * in a mostly-lockless but consistent manner. As such, the overlimit==true situation could occur slightly prematurely
+   * to an actual overlimit==true condition. This is simply conservative behavior which means we may return overlimit
+   * slightly sooner than is necessary.
+   *
+   * @param target
+   *          The allocator to transfer ownership to.
+   * @return A new transfer result with the impact of the transfer (whether it was overlimit) as well as the newly
+   *         created DrillBuf.
    */
-  public DrillBuf shareWith(final BufferLedger otherLedger, final BufferAllocator otherAllocator,
-      final int index, final int length) {
-    return shareWith(otherLedger, otherAllocator, index, length, 0);
-  }
+  public TransferResult transferOwnership(BufferAllocator target) {
 
-  // TODO(cwestin) javadoc
-  private DrillBuf shareWith(final BufferLedger otherLedger, final BufferAllocator otherAllocator,
-      final int index, final int length, final int flags) {
-    final Pointer<DrillBuf> pDrillBuf = new Pointer<>();
-    bufferLedger = bufferLedger.shareWith(pDrillBuf, otherLedger, otherAllocator, this, index, length, flags);
-    return pDrillBuf.value;
+    if (isEmpty) {
+      return new TransferResult(true, this);
+    }
+
+    final BufferLedger otherLedger = this.ledger.getLedgerForAllocator(target);
+    final DrillBuf newBuf = otherLedger.newDrillBuf(offset, length, null, true);
+    final boolean allocationFit = this.ledger.transferBalance(otherLedger);
+    return new TransferResult(allocationFit, newBuf);
   }
 
-  public boolean transferAccounting(Accountor target) {
-    if (rootBuffer) {
-      boolean outcome = acct.transferTo(target, this, length);
-      acct = target;
-      return outcome;
-    } else {
-      throw new UnsupportedOperationException();
+  /**
+   * The outcome of a Transfer.
+   */
+  public class TransferResult {
+
+    /**
+     * Whether this transfer fit within the target allocator's capacity.
+     */
+    public final boolean allocationFit;
+
+    /**
+     * The newly created buffer associated with the target allocator.
+     */
+    public final DrillBuf buffer;
+
+    private TransferResult(boolean allocationFit, DrillBuf buffer) {
+      this.allocationFit = allocationFit;
+      this.buffer = buffer;
     }
+
   }
 
   @Override
@@ -525,40 +249,28 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
    */
   @Override
   public synchronized boolean release(int decrement) {
-    Preconditions.checkArgument(decrement > 0,
-        "release(%d) argument is not positive", decrement);
-    if (DEBUG) {
-      historicalLog.recordEvent("release(%d)", decrement);
+    if (isEmpty) {
+      return false;
     }
 
-    if (oldWorld) {
-      if(rootBuffer){
-        final long newRefCnt = this.rootRefCnt.addAndGet(-decrement);
-        Preconditions.checkArgument(newRefCnt > -1, "Buffer has negative reference count.");
-        if (newRefCnt == 0) {
-          byteBuf.release(decrement);
-          acct.release(this, length);
-          return true;
-        }else{
-          return false;
-        }
-      }else{
-        return byteBuf.release(decrement);
-      }
+    if (decrement < 1) {
+      throw new IllegalStateException(String.format("release(%d) argument is not positive. Buffer Info: %s",
+          decrement, toVerboseString()));
     }
 
-    final int refCnt = rootRefCnt.addAndGet(-decrement);
-    Preconditions.checkState(refCnt >= 0, "DrillBuf[%d] refCnt has gone negative", id);
-    if (refCnt == 0) {
-      bufferLedger.release(this);
+    final int refCnt = this.refCnt.addAndGet(-decrement);
 
-      if (DEBUG) {
-        unwrappedRemove(this);
-      }
+    if (BaseAllocator.DEBUG) {
+      historicalLog.recordEvent("release(%d). original value: %d", decrement, refCnt + decrement);
+    }
 
-      // release the underlying buffer
-      byteBuf.release(1);
+    if (refCnt < 0) {
+      throw new IllegalStateException(
+          String.format("DrillBuf[%d] refCnt has gone negative. Buffer Info: %s", id, toVerboseString()));
 
+    }
+    if (refCnt == 0) {
+      ledger.release();
       return true;
     }
 
@@ -571,37 +283,16 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   }
 
   @Override
-  public synchronized ByteBuf capacity(int newCapacity) {
-    if (oldWorld) {
-      if (rootBuffer) {
-        if (newCapacity == length) {
-          return this;
-        } else if (newCapacity < length) {
-          byteBuf.capacity(newCapacity);
-          int diff = length - byteBuf.capacity();
-          acct.releasePartial(this, diff);
-          this.length = length - diff;
-          return this;
-        } else {
-          throw new UnsupportedOperationException("Accounting byte buf doesn't support increasing allocations.");
-        }
-      } else {
-        throw new UnsupportedOperationException("Non root bufs doen't support changing allocations.");
-      }
-    }
-
-    if ((flags & F_DERIVED) != 0) {
-      throw new UnsupportedOperationException("Derived buffers don't support resizing.");
-    }
+  public synchronized DrillBuf capacity(int newCapacity) {
 
     if (newCapacity == length) {
       return this;
     }
 
+    Preconditions.checkArgument(newCapacity >= 0);
+
     if (newCapacity < length) {
-      byteBuf.capacity(newCapacity);
-      final int diff = length - byteBuf.capacity();
-      length -= diff;
+      length = newCapacity;
       return this;
     }
 
@@ -673,10 +364,9 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
 
   @Override
   public DrillBuf slice(int index, int length) {
-    if (oldWorld) {
-      DrillBuf buf = new DrillBuf(this, index, length);
-      buf.writerIndex = length;
-      return buf;
+
+    if (isEmpty) {
+      return this;
     }
 
     /*
@@ -684,17 +374,13 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
      * see http://netty.io/wiki/reference-counted-objects.html#wiki-h3-5, which explains
      * that derived buffers share their reference count with their parent
      */
-    final DrillBuf buf = shareWith(bufferLedger, allocator, index, length, F_DERIVED);
-    buf.writerIndex(length);
-    return buf;
+    final DrillBuf newBuf = ledger.newDrillBuf(offset + index, length);
+    newBuf.writerIndex(length);
+    return newBuf;
   }
 
   @Override
   public DrillBuf duplicate() {
-    if (oldWorld) {
-      return new DrillBuf(this, 0, length);
-    }
-
     return slice(0, length);
   }
 
@@ -766,8 +452,8 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   @Override
   public String toString(int index, int length, Charset charset) {
     final String basics =
-        String.format("{DrillBuf[%d], udle identityHashCode == %d, rootRefCnt identityHashCode == %d}",
-            id, System.identityHashCode(byteBuf), System.identityHashCode(rootRefCnt));
+        String.format("{DrillBuf[%d], udle identityHashCode == %d, identityHashCode == %d}",
+            id, System.identityHashCode(byteBuf), System.identityHashCode(refCnt));
 
     if (length == 0) {
       return basics;
@@ -799,20 +485,16 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   @Override
   public ByteBuf retain(int increment) {
     Preconditions.checkArgument(increment > 0, "retain(%d) argument is not positive", increment);
-    if (DEBUG) {
-      historicalLog.recordEvent("retain(%d)", increment);
-    }
 
-    if (oldWorld) {
-      if(rootBuffer){
-        this.rootRefCnt.addAndGet(increment);
-      }else{
-        byteBuf.retain(increment);
-      }
+    if (isEmpty) {
       return this;
     }
 
-    rootRefCnt.addAndGet(increment);
+    if (BaseAllocator.DEBUG) {
+      historicalLog.recordEvent("retain(%d)", increment);
+    }
+
+    refCnt.addAndGet(increment);
     return this;
   }
 
@@ -1109,65 +791,42 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
     return PlatformDependent.getByte(addr(index));
   }
 
-  public static DrillBuf getEmpty(BufferAllocator allocator, Accountor a) {
-    return new DrillBuf(allocator, a);
-  }
-
-  public static DrillBuf getEmpty(final BufferLedger bufferLedger, final BufferAllocator bufferAllocator) {
-    return new DrillBuf(bufferLedger, bufferAllocator);
+  @Override
+  public void close() {
+    release();
   }
 
   /**
-   * Find out if this is a "root buffer." This is obsolete terminology
-   * based on the original implementation of DrillBuf, which would layer
-   * DrillBufs on top of other DrillBufs when slicing (or duplicating).
-   * The buffer at the bottom of the layer was the "root buffer." However,
-   * the current implementation flattens such references to always make
-   * DrillBufs that are wrap a single buffer underneath, and slices and
-   * their original source have a shared fate as per
-   * http://netty.io/wiki/reference-counted-objects.html#wiki-h3-5, so
-   * this concept isn't really meaningful anymore. But there are callers
-   * that want to know a buffer's original size, and whether or not it
-   * is "primal" in some sense. Perhaps this just needs a new name that
-   * indicates that the buffer was an "original" and not a slice.
+   * Returns the possible memory consumed by this DrillBuf in the worse case scenario. (not shared, connected to larger
+   * underlying buffer of allocated memory)
    *
-   * @return whether or not the buffer is an original
+   * @return Size in bytes.
    */
-  @Deprecated
-  public boolean isRootBuffer() {
-    if (oldWorld) {
-      return rootBuffer;
-    }
-
-    return (flags & F_DERIVED) == 0;
-  }
-
-  @Override
-  public void close() {
-    release();
+  public int getPossibleMemoryConsumed() {
+    return ledger.getSize();
   }
 
   /**
-   * Indicates whether this DrillBuf and the supplied one have a "shared fate."
-   * Having a "shared fate" indicates that the two DrillBufs share a reference
-   * count, and will both be released at the same time if either of them is
-   * released.
-   * @param otherBuf the other buffer to check against
-   * @return true if the two buffers have a shared fate, false otherwise
+   * Return that is Accounted for by this buffer (and its potentially shared siblings within the context of the
+   * associated allocator).
+   *
+   * @return Size in bytes.
    */
-  public boolean hasSharedFate(final DrillBuf otherBuf) {
-    return rootRefCnt == otherBuf.rootRefCnt;
+  public int getActualMemoryConsumed() {
+    return ledger.getAccountedSize();
   }
 
   private final static int LOG_BYTES_PER_ROW = 10;
+
   /**
-   * Log this buffer's byte contents in the form of a hex dump.
-   *
-   * @param logger where to log to
-   * @param start the starting byte index
-   * @param length how many bytes to log
+   * Return the buffer's byte contents in the form of a hex dump.
+   * @param start
+   *          the starting byte index
+   * @param length
+   *          how many bytes to log
+   * @return A hex dump in a String.
    */
-  public void logBytes(final Logger logger, final int start, final int length) {
+  public String toHexString(final int start, final int length) {
     final int roundedStart = (start / LOG_BYTES_PER_ROW) * LOG_BYTES_PER_ROW;
 
     final StringBuilder sb = new StringBuilder("buffer byte dump\n");
@@ -1184,7 +843,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
       }
       sb.append('\n');
     }
-    logger.trace(sb.toString());
+    return sb.toString();
   }
 
   /**
@@ -1192,27 +851,28 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
    *
    * @return integer id
    */
-  public int getId() {
+  public long getId() {
     return id;
   }
 
-  /**
-   * Log this buffer's history.
-   *
-   * @param logger the logger to use
-   */
-  public void logHistory(final Logger logger) {
-    if (historicalLog == null) {
-      logger.warn("DrillBuf[{}] historicalLog not available", id);
-    } else {
-      historicalLog.logHistory(logger);
+
+  public String toVerboseString() {
+    if (isEmpty) {
+      return toString();
     }
+
+    StringBuilder sb = new StringBuilder();
+    ledger.print(sb, 0, Verbosity.LOG_WITH_STACKTRACE);
+    return sb.toString();
   }
 
-  public void logHistoryForUdle(final Logger logger, final UnsafeDirectLittleEndian udle) {
-    final Collection<DrillBuf> drillBufs = unwrappedGet(udle);
-    for(final DrillBuf drillBuf : drillBufs) {
-      drillBuf.logHistory(logger);
+  public void print(StringBuilder sb, int indent, Verbosity verbosity) {
+    BaseAllocator.indent(sb, indent).append(toString());
+
+    if (BaseAllocator.DEBUG && !isEmpty && verbosity.includeHistoricalLog) {
+      sb.append("\n");
+      historicalLog.buildHistory(sb, indent + 1, verbosity.includeStackTraces);
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/io/netty/buffer/ExpandableByteBuf.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/ExpandableByteBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/ExpandableByteBuf.java
new file mode 100644
index 0000000..7788552
--- /dev/null
+++ b/exec/memory/base/src/main/java/io/netty/buffer/ExpandableByteBuf.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.netty.buffer;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+
+/**
+ * Allows us to decorate DrillBuf to make it expandable so that we can use them in the context of the Netty framework
+ * (thus supporting RPC level memory accounting).
+ */
+public class ExpandableByteBuf extends MutableWrappedByteBuf {
+
+  private final BufferAllocator allocator;
+
+  public ExpandableByteBuf(ByteBuf buffer, BufferAllocator allocator) {
+    super(buffer);
+    this.allocator = allocator;
+  }
+
+  @Override
+  public ByteBuf copy(int index, int length) {
+    return new ExpandableByteBuf(buffer.copy(index, length), allocator);
+  }
+
+  @Override
+  public ByteBuf capacity(int newCapacity) {
+    if (newCapacity > capacity()) {
+      ByteBuf newBuf = allocator.buffer(newCapacity);
+      newBuf.writeBytes(buffer, 0, buffer.capacity());
+      newBuf.readerIndex(buffer.readerIndex());
+      newBuf.writerIndex(buffer.writerIndex());
+      buffer.release();
+      buffer = newBuf;
+      return newBuf;
+    } else {
+      return super.capacity(newCapacity);
+    }
+  }
+
+}