You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/12/22 16:06:36 UTC

[13/13] drill git commit: DRILL-4134: Allocator Improvements

DRILL-4134: Allocator Improvements

- make Allocator mostly lockless
- change BaseAllocator maps to direct references
- add documentation around memory management model
- move transfer and ownership methods to DrillBuf
- Improve debug messaging.
- Fix/revert sort changes
- Remove unused fragment limit flag
- Add time to HistoricalLog events
- Remove reservation amount from RootAllocator constructor (since not allowed)
- Fix concurrency issue where allocator is closing at same moment as incoming batch transfer, causing leaked memory and/or query failure.
- Add new AutoCloseables.close(Iterable<AutoCloseable>)
- Remove extraneous DataResponseHandler and Impl (and update TestBitRpc to use smarter mock of FragmentManager)
- Remove the concept of poison pill record batches, using instead FragmentContext.isOverMemoryLimit()
- Update incoming data batches so that they are transferred under protection of a close lock
- Improve field names in IncomingBuffers and move synchronization to collectors as opposed to IncomingBuffers (also change decrementing to decrementToZero rather than two part check).

This closes #238.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/809f4620
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/809f4620
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/809f4620

Branch: refs/heads/master
Commit: 809f4620d7d82c72240212de13b993049550959d
Parents: 53dcabe
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sun Nov 15 17:26:02 2015 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Dec 21 23:52:32 2015 -0800

----------------------------------------------------------------------
 .../org/apache/drill/common/AutoCloseables.java |    2 +-
 .../org/apache/drill/common/HistoricalLog.java  |   68 +-
 .../org/apache/drill/common/StackTrace.java     |    9 +-
 .../common/concurrent/AutoCloseableLock.java    |   43 +
 .../hive/HiveDrillNativeScanBatchCreator.java   |    8 +-
 distribution/pom.xml                            |    5 -
 distribution/src/assemble/bin.xml               |    1 -
 exec/java-exec/pom.xml                          |    2 +-
 .../expr/fn/impl/StringFunctionHelpers.java     |    4 +-
 .../apache/drill/exec/expr/fn/impl/XXHash.java  |    4 +-
 .../drill/exec/memory/RootAllocatorFactory.java |   40 +
 .../drill/exec/ops/BufferManagerImpl.java       |    3 +-
 .../apache/drill/exec/ops/FragmentContext.java  |   63 +-
 .../drill/exec/ops/OperatorContextImpl.java     |   17 +-
 .../org/apache/drill/exec/ops/QueryContext.java |   14 +-
 .../drill/exec/physical/impl/BaseRootExec.java  |    2 +-
 .../drill/exec/physical/impl/ScanBatch.java     |    2 +-
 .../exec/physical/impl/SingleSenderCreator.java |    3 +-
 .../BroadcastSenderRootExec.java                |    3 +-
 .../impl/mergereceiver/MergingRecordBatch.java  |    5 +-
 .../PartitionSenderRootExec.java                |    3 +-
 .../impl/sort/SortRecordBatchBuilder.java       |   49 +-
 .../physical/impl/trace/TraceRecordBatch.java   |    7 +-
 .../impl/union/UnionAllRecordBatch.java         |   15 +-
 .../UnorderedReceiverBatch.java                 |    7 +-
 .../physical/impl/xsort/ExternalSortBatch.java  |   51 +-
 .../impl/xsort/PriorityQueueCopier.java         |    3 +-
 .../impl/xsort/PriorityQueueCopierTemplate.java |    8 +-
 .../drill/exec/record/AbstractRecordBatch.java  |    4 +-
 .../drill/exec/record/RecordBatchLoader.java    |    6 +-
 .../apache/drill/exec/record/WritableBatch.java |    5 +-
 .../drill/exec/rpc/control/ControlClient.java   |    2 +-
 .../drill/exec/rpc/control/ControlServer.java   |    2 +-
 .../apache/drill/exec/rpc/data/DataClient.java  |    2 +-
 .../exec/rpc/data/DataConnectionCreator.java    |   14 +-
 .../exec/rpc/data/DataResponseHandler.java      |   34 -
 .../exec/rpc/data/DataResponseHandlerImpl.java  |   58 -
 .../apache/drill/exec/rpc/data/DataServer.java  |  109 +-
 .../drill/exec/rpc/data/IncomingDataBatch.java  |   73 +
 .../apache/drill/exec/rpc/user/UserServer.java  |    4 +-
 .../org/apache/drill/exec/server/Drillbit.java  |    2 +-
 .../drill/exec/service/ServiceEngine.java       |    6 +-
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |   10 +-
 .../exec/store/parquet/ParquetRecordWriter.java |    2 +-
 .../store/parquet/ParquetScanBatchCreator.java  |    6 +-
 .../drill/exec/store/sys/MemoryIterator.java    |    4 +-
 .../org/apache/drill/exec/work/WorkManager.java |    8 -
 .../exec/work/batch/AbstractDataCollector.java  |   15 +-
 .../exec/work/batch/BaseRawBatchBuffer.java     |   28 +-
 .../drill/exec/work/batch/IncomingBuffers.java  |   82 +-
 .../exec/work/batch/SpoolingRawBatchBuffer.java |   12 +-
 .../work/batch/UnlimitedRawBatchBuffer.java     |    2 +-
 .../apache/drill/exec/work/foreman/Foreman.java |    7 +-
 .../exec/work/fragment/FragmentManager.java     |    4 +-
 .../work/fragment/NonRootFragmentManager.java   |    4 +-
 .../exec/work/fragment/RootFragmentManager.java |    4 +-
 .../java/org/apache/drill/BaseTestQuery.java    |   13 +-
 .../apache/drill/TestAllocationException.java   |   71 -
 .../drill/exec/memory/TestAllocators.java       |  273 ++-
 .../drill/exec/memory/TestBaseAllocator.java    |  651 -------
 .../exec/physical/impl/TestOptiqPlans.java      |    3 +-
 .../exec/record/vector/TestValueVector.java     |   23 +-
 .../apache/drill/exec/server/TestBitRpc.java    |   84 +-
 .../testing/TestCountDownLatchInjection.java    |   13 +-
 .../exec/testing/TestExceptionInjection.java    |   21 +-
 .../drill/exec/testing/TestPauseInjection.java  |   17 +-
 .../drill/exec/testing/TestResourceLeak.java    |   18 +-
 .../complex/writer/TestPromotableWriter.java    |    2 -
 exec/memory/base/pom.xml                        |   10 +
 .../src/main/java/io/netty/buffer/DrillBuf.java |  714 ++------
 .../java/io/netty/buffer/ExpandableByteBuf.java |   55 +
 .../java/io/netty/buffer/FakeAllocator.java     |  167 --
 .../main/java/io/netty/buffer/LargeBuffer.java  |  297 +---
 .../io/netty/buffer/MutableWrappedByteBuf.java  |  336 ++++
 .../netty/buffer/PooledByteBufAllocatorL.java   |  312 ++--
 .../netty/buffer/UnsafeDirectLittleEndian.java  |  316 ++--
 .../apache/drill/exec/memory/Accountant.java    |  274 +++
 .../org/apache/drill/exec/memory/Accountor.java |   43 -
 .../exec/memory/AllocationReservation.java      |   86 +
 .../exec/memory/AllocatorClosedException.java   |   31 +
 .../drill/exec/memory/AllocatorManager.java     |  356 ++++
 .../apache/drill/exec/memory/BaseAllocator.java |  739 ++++++++
 .../drill/exec/memory/BoundsChecking.java       |    1 +
 .../drill/exec/memory/BufferAllocator.java      |  155 +-
 .../drill/exec/memory/ChildAllocator.java       |   53 +
 .../exec/memory/DrillByteBufAllocator.java      |  141 ++
 .../apache/drill/exec/memory/LimitConsumer.java |   28 -
 .../java/org/apache/drill/exec/memory/README.md |  121 ++
 .../apache/drill/exec/memory/RootAllocator.java |   39 +
 .../apache/drill/exec/memory/package-info.java  |   24 +
 .../base/src/main/resources/drill-module.conf   |   25 +
 .../drill/exec/memory/TestAccountant.java       |  163 ++
 .../drill/exec/memory/TestBaseAllocator.java    |  645 +++++++
 .../apache/drill/exec/memory/TestEndianess.java |   43 +
 exec/memory/impl/pom.xml                        |   68 -
 .../apache/drill/exec/memory/AccountorImpl.java |  437 -----
 .../drill/exec/memory/AllocationPolicy.java     |   38 -
 .../exec/memory/AllocationPolicyAgent.java      |   69 -
 .../exec/memory/AllocationReservation.java      |  152 --
 .../exec/memory/AllocatorClosedException.java   |   31 -
 .../drill/exec/memory/AllocatorOwner.java       |   40 -
 .../exec/memory/AllocatorsStatsMXBean.java      |   34 -
 .../drill/exec/memory/AtomicRemainder.java      |  215 ---
 .../apache/drill/exec/memory/BaseAllocator.java | 1654 ------------------
 .../apache/drill/exec/memory/BufferLedger.java  |   94 -
 .../exec/memory/ChainedAllocatorOwner.java      |   60 -
 .../drill/exec/memory/ChildAllocator.java       |   47 -
 .../apache/drill/exec/memory/RootAllocator.java |  125 --
 .../drill/exec/memory/RootAllocatorFactory.java |   43 -
 .../drill/exec/memory/TopLevelAllocator.java    |  408 -----
 .../impl/src/main/resources/drill-module.conf   |   25 -
 .../apache/drill/exec/memory/TestEndianess.java |   43 -
 exec/memory/pom.xml                             |    1 -
 .../exec/rpc/BasicClientWithConnection.java     |    2 +-
 .../exec/expr/fn/impl/ByteFunctionHelpers.java  |    8 +-
 .../apache/drill/exec/util/DecimalUtility.java  |   23 +-
 .../org/apache/drill/exec/proto/BitData.java    |   94 +-
 .../apache/drill/exec/proto/SchemaBitData.java  |    7 -
 .../exec/proto/beans/FragmentRecordBatch.java   |   23 -
 protocol/src/main/protobuf/BitData.proto        |    1 -
 120 files changed, 4394 insertions(+), 6661 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/common/src/main/java/org/apache/drill/common/AutoCloseables.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/AutoCloseables.java b/common/src/main/java/org/apache/drill/common/AutoCloseables.java
index 43cbbf5..fcdfe14 100644
--- a/common/src/main/java/org/apache/drill/common/AutoCloseables.java
+++ b/common/src/main/java/org/apache/drill/common/AutoCloseables.java
@@ -68,7 +68,7 @@ public class AutoCloseables {
    * Closes all autoCloseables if not null and suppresses subsequent exceptions if more than one
    * @param autoCloseables the closeables to close
    */
-  public static void close(Collection<? extends AutoCloseable> ac) throws Exception {
+  public static void close(Iterable<? extends AutoCloseable> ac) throws Exception {
     Exception topLevelException = null;
     for (AutoCloseable closeable : ac) {
       try {

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/common/src/main/java/org/apache/drill/common/HistoricalLog.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/HistoricalLog.java b/common/src/main/java/org/apache/drill/common/HistoricalLog.java
index 49b7c33..3331215 100644
--- a/common/src/main/java/org/apache/drill/common/HistoricalLog.java
+++ b/common/src/main/java/org/apache/drill/common/HistoricalLog.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.common;
 
+import java.util.Arrays;
 import java.util.LinkedList;
 
 import org.slf4j.Logger;
@@ -30,9 +31,11 @@ public class HistoricalLog {
   private static class Event {
     private final String note; // the event text
     private final StackTrace stackTrace; // where the event occurred
+    private final long time;
 
     public Event(final String note) {
       this.note = note;
+      this.time = System.nanoTime();
       stackTrace = new StackTrace();
     }
   }
@@ -106,8 +109,8 @@ public class HistoricalLog {
    *
    * @param sb {@link StringBuilder} to write to
    */
-  public void buildHistory(final StringBuilder sb) {
-    buildHistory(sb, null);
+  public void buildHistory(final StringBuilder sb, boolean includeStackTrace) {
+    buildHistory(sb, 0, includeStackTrace);
   }
 
   /**
@@ -119,34 +122,50 @@ public class HistoricalLog {
    * @param additional an extra string that will be written between the identifying
    *     information and the history; often used for a current piece of state
    */
-  public synchronized void buildHistory(final StringBuilder sb, final CharSequence additional) {
-    sb.append('\n')
-        .append(idString);
 
-    if (additional != null) {
-      sb.append('\n')
-          .append(additional)
-          .append('\n');
-    }
+  /**
+   *
+   * @param sb
+   * @param indexLevel
+   * @param includeStackTrace
+   */
+  public synchronized void buildHistory(final StringBuilder sb, int indent, boolean includeStackTrace) {
+    final char[] indentation = new char[indent];
+    final char[] innerIndentation = new char[indent + 2];
+    Arrays.fill(indentation, ' ');
+    Arrays.fill(innerIndentation, ' ');
+
+    sb.append(indentation)
+        .append("event log for: ")
+        .append(idString)
+        .append('\n');
 
-    sb.append(" event log\n");
 
     if (firstEvent != null) {
-      sb.append("  ")
+      sb.append(innerIndentation)
+          .append(firstEvent.time)
+          .append(' ')
           .append(firstEvent.note)
           .append('\n');
-      firstEvent.stackTrace.writeToBuilder(sb, 4);
+      if (includeStackTrace) {
+        firstEvent.stackTrace.writeToBuilder(sb, indent + 2);
+      }
 
       for(final Event event : history) {
         if (event == firstEvent) {
           continue;
         }
-
-        sb.append("  ")
+        sb.append(innerIndentation)
+            .append("  ")
+            .append(event.time)
+            .append(' ')
             .append(event.note)
             .append('\n');
 
-        event.stackTrace.writeToBuilder(sb, 4);
+        if (includeStackTrace) {
+          event.stackTrace.writeToBuilder(sb, indent + 2);
+          sb.append('\n');
+        }
       }
     }
   }
@@ -157,23 +176,10 @@ public class HistoricalLog {
    * events with their stack traces.
    *
    * @param logger {@link Logger} to write to
-   * @param additional an extra string that will be written between the identifying
-   *     information and the history; often used for a current piece of state
    */
-  public void logHistory(final Logger logger, final CharSequence additional) {
+  public void logHistory(final Logger logger) {
     final StringBuilder sb = new StringBuilder();
-    buildHistory(sb, additional);
+    buildHistory(sb, 0, true);
     logger.debug(sb.toString());
   }
-
-  /**
-   * Write the history of this object to the given {@link Logger}. The history
-   * includes the identifying string provided at construction time, and all the recorded
-   * events with their stack traces.
-   *
-   * @param logger {@link Logger} to write to
-   */
-  public void logHistory(final Logger logger) {
-    logHistory(logger, null);
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/common/src/main/java/org/apache/drill/common/StackTrace.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/StackTrace.java b/common/src/main/java/org/apache/drill/common/StackTrace.java
index de64ed7..1dbffe6 100644
--- a/common/src/main/java/org/apache/drill/common/StackTrace.java
+++ b/common/src/main/java/org/apache/drill/common/StackTrace.java
@@ -36,13 +36,14 @@ public class StackTrace {
 
   /**
    * Write the stack trace to a StringBuilder.
-   *
-   * @param sb where to write it
-   * @param indent how many spaces to indent each line
+   * @param sb
+   *          where to write it
+   * @param indent
+   *          how many double spaces to indent each line
    */
   public void writeToBuilder(final StringBuilder sb, final int indent) {
     // create the indentation string
-    final char[] indentation = new char[indent];
+    final char[] indentation = new char[indent * 2];
     Arrays.fill(indentation, ' ');
 
     // write the stack trace in standard Java format

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java b/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java
new file mode 100644
index 0000000..91d50b4
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.concurrent;
+
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Simple wrapper class that allows Locks to be released via an try-with-resources block.
+ */
+public class AutoCloseableLock implements AutoCloseable {
+
+  private final Lock lock;
+
+  public AutoCloseableLock(Lock lock) {
+    this.lock = lock;
+  }
+
+  public AutoCloseableLock open() {
+    lock.lock();
+    return this;
+  }
+
+  @Override
+  public void close() {
+    lock.unlock();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
index 1a76136..9f53971 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
@@ -24,8 +24,6 @@ import java.util.Map.Entry;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -55,6 +53,9 @@ import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 @SuppressWarnings("unused")
 public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNativeParquetSubScan> {
 
@@ -94,8 +95,7 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa
       }
     }
 
-    final OperatorContext oContext = context.newOperatorContext(config,
-        false /* ScanBatch is not subject to fragment memory limit */);
+    final OperatorContext oContext = context.newOperatorContext(config);
 
     int currentPartitionIndex = 0;
     final List<RecordReader> readers = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/pom.xml b/distribution/pom.xml
index f8aced5..5aaf09d 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -45,11 +45,6 @@
       <version>${project.version}</version>
     </dependency>    
     <dependency>
-      <groupId>org.apache.drill.memory</groupId>
-      <artifactId>drill-memory-impl</artifactId>
-      <version>${project.version}</version>
-    </dependency> 
-    <dependency>
       <groupId>org.apache.drill.exec</groupId>
       <artifactId>drill-rpc</artifactId>
       <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/distribution/src/assemble/bin.xml
----------------------------------------------------------------------
diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml
index f8ada9b..449ac6c 100644
--- a/distribution/src/assemble/bin.xml
+++ b/distribution/src/assemble/bin.xml
@@ -89,7 +89,6 @@
         <include>org.apache.drill:drill-logical:jar</include>
         <include>org.apache.drill.exec:vector:jar</include>
         <include>org.apache.drill.memory:drill-memory-base:jar</include>
-        <include>org.apache.drill.memory:drill-memory-impl:jar</include>
         <include>org.apache.drill.exec:drill-rpc:jar</include>
         <include>org.apache.drill.exec:drill-java-exec:jar</include>
         <include>org.apache.drill.contrib.storage-hive:drill-storage-hive-core</include>

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 3cd2585..efb549c 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -289,7 +289,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.drill.memory</groupId>
-      <artifactId>drill-memory-impl</artifactId>
+      <artifactId>drill-memory-base</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionHelpers.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionHelpers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionHelpers.java
index c919f28..3d0f170 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionHelpers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionHelpers.java
@@ -30,8 +30,6 @@ import com.google.common.base.Charsets;
 public class StringFunctionHelpers {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StringFunctionHelpers.class);
 
-  private static final boolean BOUNDS_CHECKING_ENABLED = BoundsChecking.BOUNDS_CHECKING_ENABLED;
-
   static final int RADIX = 10;
   static final long MAX_LONG = -Long.MAX_VALUE / RADIX;
   static final int MAX_INT = -Integer.MAX_VALUE / RADIX;
@@ -212,7 +210,7 @@ public class StringFunctionHelpers {
   private static final ISOChronology CHRONOLOGY = org.joda.time.chrono.ISOChronology.getInstanceUTC();
 
   public static long getDate(DrillBuf buf, int start, int end){
-    if(BOUNDS_CHECKING_ENABLED){
+    if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
       buf.checkBytes(start, end);
     }
     return memGetDate(buf.memoryAddress(), start, end);

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/XXHash.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/XXHash.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/XXHash.java
index 033e2cb..0124b2f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/XXHash.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/XXHash.java
@@ -27,8 +27,6 @@ import com.google.common.primitives.UnsignedLongs;
 public final class XXHash {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(XXHash.class);
 
-  private static final boolean BOUNDS_CHECKING_ENABLED = BoundsChecking.BOUNDS_CHECKING_ENABLED;
-
   static final long PRIME64_1 = UnsignedLongs.decode("11400714785074694791");
   static final long PRIME64_2 = UnsignedLongs.decode("14029467366897019727");
   static final long PRIME64_3 = UnsignedLongs.decode("1609587929392839161");
@@ -168,7 +166,7 @@ public final class XXHash {
   }
 
   public static long hash64(int start, int end, DrillBuf buffer, long seed){
-    if(BOUNDS_CHECKING_ENABLED){
+    if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
       buffer.checkBytes(start, end);
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/memory/RootAllocatorFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/RootAllocatorFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/RootAllocatorFactory.java
new file mode 100644
index 0000000..4fad668
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/RootAllocatorFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import org.apache.drill.common.config.DrillConfig;
+
+public class RootAllocatorFactory {
+
+  public static final String TOP_LEVEL_MAX_ALLOC = "drill.memory.top.max";
+
+  /**
+   * Constructor to prevent instantiation of this static utility class.
+   */
+  private RootAllocatorFactory() {}
+
+  /**
+   * Create a new Root Allocator
+   * @param drillConfig
+   *          the DrillConfig
+   * @return a new root allocator
+   */
+  public static BufferAllocator newRoot(final DrillConfig drillConfig) {
+    return new RootAllocator(Math.min(DrillConfig.getMaxDirectMemory(), drillConfig.getLong(TOP_LEVEL_MAX_ALLOC)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
index 003fc9b..b8c6796 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
@@ -55,9 +55,8 @@ public class BufferManagerImpl implements BufferManager {
   }
 
   public DrillBuf getManagedBuffer(int size) {
-    DrillBuf newBuf = allocator.buffer(size);
+    DrillBuf newBuf = allocator.buffer(size, this);
     managedBuffers.put(newBuf.memoryAddress(), newBuf);
-    newBuf.setBufferManager(this);
     return newBuf;
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index a773c22..304ecf1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -35,7 +35,6 @@ import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.LimitConsumer;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
@@ -159,8 +158,10 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     // Add the fragment context to the root allocator.
     // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
     try {
-      allocator = context.getAllocator().getChildAllocator(new AsLimitConsumer(), fragment.getMemInitial(),
-          fragment.getMemMax(), true);
+      allocator = context.getAllocator().newChildAllocator(
+          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
+          fragment.getMemInitial(),
+          fragment.getMemMax());
       Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
     } catch (final OutOfMemoryException e) {
       throw UserException.memoryError(e)
@@ -174,31 +175,6 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     bufferManager = new BufferManagerImpl(this.allocator);
   }
 
-  private class AsLimitConsumer implements LimitConsumer {
-    final String identifier = QueryIdHelper.getFragmentId(fragment.getHandle());
-
-    @Override
-    public String getIdentifier() {
-      return identifier;
-    }
-
-    @Override
-    public long getAllocated() {
-      return allocator.getAllocatedMemory();
-    }
-
-    @Override
-    public long getLimit() {
-      return allocator.getLimit();
-    }
-
-    @Override
-    public void setLimit(long limit) {
-      allocator.setLimit(limit);
-    }
-
-  }
-
   /**
    * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
    * the long list of test files.
@@ -297,11 +273,6 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return frag;
   }
 
-  public LimitConsumer asLimitConsumer() {
-    return new AsLimitConsumer();
-  }
-
-
   /**
    * Get this fragment's allocator.
    * @return the allocator
@@ -314,11 +285,19 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return allocator;
   }
 
-  public BufferAllocator getNewChildAllocator(final long initialReservation,
-                                              final long maximumReservation,
-                                              final boolean applyFragmentLimit) throws OutOfMemoryException {
-    return allocator.getChildAllocator(new AsLimitConsumer(), initialReservation, maximumReservation,
-        applyFragmentLimit);
+  public BufferAllocator getNewChildAllocator(final String operatorName,
+      final int operatorId,
+      final long initialReservation,
+      final long maximumReservation) throws OutOfMemoryException {
+    return allocator.newChildAllocator(
+        "op:" + QueryIdHelper.getFragmentId(fragment.getHandle()) + ":" + operatorId + ":" + operatorName,
+        initialReservation,
+        maximumReservation
+        );
+  }
+
+  public boolean isOverMemoryLimit() {
+    return allocator.isOverLimit();
   }
 
   public <T> T getImplementationClass(final ClassGenerator<T> cg)
@@ -361,16 +340,16 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return buffers;
   }
 
-  public OperatorContext newOperatorContext(PhysicalOperator popConfig, OperatorStats stats, boolean applyFragmentLimit)
+  public OperatorContext newOperatorContext(PhysicalOperator popConfig, OperatorStats stats)
       throws OutOfMemoryException {
-    OperatorContextImpl context = new OperatorContextImpl(popConfig, this, stats, applyFragmentLimit);
+    OperatorContextImpl context = new OperatorContextImpl(popConfig, this, stats);
     contexts.add(context);
     return context;
   }
 
-  public OperatorContext newOperatorContext(PhysicalOperator popConfig, boolean applyFragmentLimit)
+  public OperatorContext newOperatorContext(PhysicalOperator popConfig)
       throws OutOfMemoryException {
-    OperatorContextImpl context = new OperatorContextImpl(popConfig, this, applyFragmentLimit);
+    OperatorContextImpl context = new OperatorContextImpl(popConfig, this);
     contexts.add(context);
     return context;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index a3ec6bd..8217afd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -48,7 +48,6 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
   private final PhysicalOperator popConfig;
   private final OperatorStats stats;
   private final BufferManager manager;
-  private final boolean applyFragmentLimit;
   private DrillFileSystem fs;
   private final ExecutorService executor;
 
@@ -60,21 +59,23 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
    */
   private ListeningExecutorService delegatePool;
 
-  public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, boolean applyFragmentLimit) throws OutOfMemoryException {
-    this.applyFragmentLimit=applyFragmentLimit;
-    this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit);
+  public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context) throws OutOfMemoryException {
+    this.allocator = context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
+        popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation());
     this.popConfig = popConfig;
     this.manager = new BufferManagerImpl(allocator);
 
-    OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig));
+    OpProfileDef def =
+        new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig));
     stats = context.getStats().newOperatorStats(def, allocator);
     executionControls = context.getExecutionControls();
     executor = context.getDrillbitContext().getExecutor();
   }
 
-  public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats, boolean applyFragmentLimit) throws OutOfMemoryException {
-    this.applyFragmentLimit=applyFragmentLimit;
-    this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit);
+  public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats)
+      throws OutOfMemoryException {
+    this.allocator = context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
+        popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation());
     this.popConfig = popConfig;
     this.manager = new BufferManagerImpl(allocator);
     this.stats     = stats;

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index e4f50a1..51a581a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -30,13 +30,14 @@ import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.config.LogicalPlanPersistence;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.sql.DrillOperatorTable;
 import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
@@ -52,8 +53,6 @@ import org.apache.drill.exec.util.Utilities;
 
 import com.google.common.collect.Lists;
 
-// TODO except for a couple of tests, this is only created by Foreman
-// TODO the many methods that just return drillbitContext.getXxx() should be replaced with getDrillbitContext()
 // TODO - consider re-name to PlanningContext, as the query execution context actually appears
 // in fragment contexts
 public class QueryContext implements AutoCloseable, OptimizerRulesContext {
@@ -80,7 +79,7 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext {
    */
   private boolean closed = false;
 
-  public QueryContext(final UserSession session, final DrillbitContext drillbitContext) {
+  public QueryContext(final UserSession session, final DrillbitContext drillbitContext, QueryId queryId) {
     this.drillbitContext = drillbitContext;
     this.session = session;
     queryOptions = new QueryOptionManager(session.getOptions());
@@ -92,9 +91,10 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext {
     queryContextInfo = Utilities.createQueryContextInfo(session.getDefaultSchemaName());
     contextInformation = new ContextInformation(session.getCredentials(), queryContextInfo);
 
-    allocator = drillbitContext.getAllocator().getChildAllocator(null, plannerSettings.getInitialPlanningMemorySize(),
-        plannerSettings.getPlanningMemoryLimit(), false);
-    // TODO(DRILL-1942) the new allocator has this capability built-in, so this can be removed once that is available
+    allocator = drillbitContext.getAllocator().newChildAllocator(
+        "query:" + QueryIdHelper.getQueryId(queryId),
+        PlannerSettings.getInitialPlanningMemorySize(),
+        plannerSettings.getPlanningMemoryLimit());
     bufferManager = new BufferManagerImpl(this.allocator);
     viewExpansionContext = new ViewExpansionContext(this);
     schemaTreesToClose = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index ee6475c..f720f8e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -42,7 +42,7 @@ public abstract class BaseRootExec implements RootExec {
   private List<CloseableRecordBatch> operators;
 
   public BaseRootExec(final FragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException {
-    this.oContext = fragmentContext.newOperatorContext(config, stats, true);
+    this.oContext = fragmentContext.newOperatorContext(config, stats);
     stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
         config.getOperatorType(), OperatorContext.getChildCount(config)),
         oContext.getAllocator());

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 6c763d6..73bebd1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -134,7 +134,7 @@ public class ScanBatch implements CloseableRecordBatch {
                    Iterator<RecordReader> readers)
       throws ExecutionSetupException {
     this(subScanConfig, context,
-        context.newOperatorContext(subScanConfig, false /* ScanBatch is not subject to fragment memory limit */),
+        context.newOperatorContext(subScanConfig),
         readers, Collections.<String[]> emptyList(), Collections.<Integer> emptyList());
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 73f3f95..23e97d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -21,7 +21,6 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.AccountingDataTunnel;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
@@ -66,7 +65,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     }
 
     public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException {
-      super(context, context.newOperatorContext(config, null, false), config);
+      super(context, context.newOperatorContext(config, null), config);
       this.incoming = batch;
       assert incoming != null;
       handle = context.getHandle();

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index c731e38..c88c72d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.physical.impl.broadcastsender;
 import java.util.List;
 
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.AccountingDataTunnel;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
@@ -61,7 +60,7 @@ public class BroadcastSenderRootExec extends BaseRootExec {
   public BroadcastSenderRootExec(FragmentContext context,
                                  RecordBatch incoming,
                                  BroadcastSender config) throws OutOfMemoryException {
-    super(context, context.newOperatorContext(config, null, false), config);
+    super(context, context.newOperatorContext(config, null), config);
     this.ok = true;
     this.incoming = incoming;
     this.config = config;

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 1cc598a..8db24af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.PriorityQueue;
 
+import org.apache.calcite.rel.RelFieldCollation.Direction;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
@@ -76,7 +77,6 @@ import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.calcite.rel.RelFieldCollation.Direction;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -126,8 +126,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   public MergingRecordBatch(final FragmentContext context,
                             final MergingReceiverPOP config,
                             final RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException {
-    super(config, context, true, context.newOperatorContext(config, false));
-    //super(config, context);
+    super(config, context, true, context.newOperatorContext(config));
     this.fragProviders = fragProviders;
     this.context = context;
     this.outgoingContainer = new VectorContainer(oContext);

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 5e9e84c..cfe1b80 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -32,7 +32,6 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.AccountingDataTunnel;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
@@ -100,7 +99,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
   public PartitionSenderRootExec(FragmentContext context,
                                  RecordBatch incoming,
                                  HashPartitionSender operator) throws OutOfMemoryException {
-    super(context, context.newOperatorContext(operator, null, false), operator);
+    super(context, context.newOperatorContext(operator, null), operator);
     this.incoming = incoming;
     this.operator = operator;
     this.context = context;

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index 9f452cb..f2302ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -21,18 +21,9 @@ import io.netty.buffer.DrillBuf;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 
-import com.google.common.collect.Sets;
-import io.netty.buffer.DrillBuf;
-import org.apache.drill.common.AutoCloseablePointer;
-import org.apache.drill.common.DrillAutoCloseables;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.AllocationReservation;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -48,7 +39,6 @@ import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
-import org.apache.drill.exec.vector.complex.UnionVector;
 
 public class SortRecordBatchBuilder implements AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortRecordBatchBuilder.class);
@@ -56,12 +46,14 @@ public class SortRecordBatchBuilder implements AutoCloseable {
   private final ArrayListMultimap<BatchSchema, RecordBatchData> batches = ArrayListMultimap.create();
 
   private int recordCount;
-  private final AutoCloseablePointer<SelectionVector4> pSV4 = new AutoCloseablePointer<>();
-  private final AllocationReservation allocationReservation;
   private long runningBatches;
+  private SelectionVector4 sv4;
+  private BufferAllocator allocator;
+  final AllocationReservation reservation;
 
-  public SortRecordBatchBuilder(final BufferAllocator allocator) {
-    allocationReservation = allocator.newReservation();
+  public SortRecordBatchBuilder(BufferAllocator a) {
+    this.allocator = a;
+    this.reservation = a.newReservation();
   }
 
   private long getSize(VectorAccessible batch) {
@@ -94,7 +86,7 @@ public class SortRecordBatchBuilder implements AutoCloseable {
     if (runningBatches >= Character.MAX_VALUE) {
       return false; // allowed in batch.
     }
-    if (!allocationReservation.add(batch.getRecordCount() * 4)) {
+    if (!reservation.add(batch.getRecordCount() * 4)) {
       return false;  // sv allocation available.
     }
 
@@ -117,7 +109,7 @@ public class SortRecordBatchBuilder implements AutoCloseable {
       logger.error(errMsg);
       throw new DrillRuntimeException(errMsg);
     }
-    if (!allocationReservation.add(rbd.getRecordCount() * 4)) {
+    if (!reservation.add(rbd.getRecordCount() * 4)) {
       final String errMsg = String.format("Failed to pre-allocate memory for SV. " + "Existing recordCount*4 = %d, " +
           "incoming batch recordCount*4 = %d", recordCount * 4, rbd.getRecordCount() * 4);
       logger.error(errMsg);
@@ -160,14 +152,13 @@ public class SortRecordBatchBuilder implements AutoCloseable {
       assert false : "Invalid to have an empty set of batches with no schemas.";
     }
 
-    final DrillBuf svBuffer = allocationReservation.buffer();
+    final DrillBuf svBuffer = reservation.allocateBuffer();
     if (svBuffer == null) {
       throw new OutOfMemoryError("Failed to allocate direct memory for SV4 vector in SortRecordBatchBuilder.");
     }
-    pSV4.assignNoChecked(new SelectionVector4(svBuffer, recordCount, Character.MAX_VALUE));
+    sv4 = new SelectionVector4(svBuffer, recordCount, Character.MAX_VALUE);
     BatchSchema schema = batches.keySet().iterator().next();
     List<RecordBatchData> data = batches.get(schema);
-    final SelectionVector4 sv4 = pSV4.get();
 
     // now we're going to generate the sv4 pointers
     switch (schema.getSelectionVectorMode()) {
@@ -216,16 +207,21 @@ public class SortRecordBatchBuilder implements AutoCloseable {
   }
 
   public SelectionVector4 getSv4() {
-    return pSV4.get();
+    return sv4;
   }
 
   public void clear() {
-    DrillAutoCloseables.closeNoChecked(allocationReservation);
     for (RecordBatchData d : batches.values()) {
       d.container.clear();
     }
-    batches.clear();
-    pSV4.assignNoChecked(null);
+    if (sv4 != null) {
+      sv4.clear();
+    }
+  }
+
+  @Override
+  public void close() {
+    reservation.close();
   }
 
   public List<VectorContainer> getHeldRecordBatches() {
@@ -252,11 +248,4 @@ public class SortRecordBatchBuilder implements AutoCloseable {
     // We need 4 bytes (SV4) for each record.
     return recordCount * 4;
   }
-
-  @Override
-  public void close() {
-    clear();
-    allocationReservation.close();
-    DrillAutoCloseables.closeNoChecked(pSV4);
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 78e83d6..ff83cc9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -25,6 +25,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.cache.VectorAccessibleSerializable;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Trace;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
@@ -57,6 +58,8 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
 
   private SelectionVector2 sv = null;
 
+  private final BufferAllocator localAllocator;
+
   /* Tag associated with each trace operator */
   final String traceTag;
 
@@ -70,7 +73,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
     super(pop, context, incoming);
     this.traceTag = pop.traceTag;
     logLocation = context.getConfig().getString(ExecConstants.TRACE_DUMP_DIRECTORY);
-
+    localAllocator = context.getNewChildAllocator("trace", 200, 0, Long.MAX_VALUE);
     String fileName = getFileName();
 
     /* Create the log file we will dump to and initialize the file descriptors */
@@ -116,7 +119,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    batch.reconstructContainer(container);
+    batch.reconstructContainer(localAllocator, container);
     if (incomingHasSv2) {
       sv = wrap.getSv2();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 0d47a57..55b9afe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -37,7 +37,6 @@ import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.ValueVectorReadExpression;
 import org.apache.drill.exec.expr.ValueVectorWriteExpression;
-import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.UnionAll;
 import org.apache.drill.exec.record.AbstractRecordBatch;
@@ -84,14 +83,6 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
   }
 
   @Override
-  public void kill(boolean sendUpstream) {
-    if(current != null) {
-      current.kill(sendUpstream);
-      current = null;
-    }
-  }
-
-  @Override
   protected void killIncoming(boolean sendUpstream) {
     unionAllInput.getLeftRecordBatch().kill(sendUpstream);
     unionAllInput.getRightRecordBatch().kill(sendUpstream);
@@ -278,10 +269,8 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
 
   // This method is used by inner class to clear the current record batch
   private void clearCurrentRecordBatch() {
-    if (current != null) {
-      for(final VectorWrapper<?> v: current) {
-        v.clear();
-      }
+    for(VectorWrapper<?> v: current) {
+      v.clear();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 00cf295..be7d4ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -79,7 +79,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
     this.context = context;
     // In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector,
     // we may need an allocator for the new offset vector. Therefore, here we pass the context's allocator to batchLoader.
-    oContext = context.newOperatorContext(config, false);
+    oContext = context.newOperatorContext(config);
     this.batchLoader = new RecordBatchLoader(oContext.getAllocator());
 
     this.stats = oContext.getStats();
@@ -159,7 +159,8 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
         batch = getNextBatch();
 
         // skip over empty batches. we do this since these are basically control messages.
-        while (batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0 && (!first || batch.getHeader().getDef().getFieldCount() == 0)) {
+        while (batch != null && batch.getHeader().getDef().getRecordCount() == 0
+            && (!first || batch.getHeader().getDef().getFieldCount() == 0)) {
           batch = getNextBatch();
         }
       } finally {
@@ -176,7 +177,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
         return IterOutcome.NONE;
       }
 
-      if (batch.getHeader().getIsOutOfMemory()) {
+      if (context.isOverMemoryLimit()) {
         return IterOutcome.OUT_OF_MEMORY;
       }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 1ac82bf..4dbd92d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -157,8 +157,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     SPILL_BATCH_GROUP_SIZE = config.getInt(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE);
     SPILL_THRESHOLD = config.getInt(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD);
     dirs = Iterators.cycle(config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS));
-    copierAllocator = oContext.getAllocator().getChildAllocator(
-        context.asLimitConsumer(), PriorityQueueCopier.INITIAL_ALLOCATION, PriorityQueueCopier.MAX_ALLOCATION, true);
+    copierAllocator = oContext.getAllocator().newChildAllocator(oContext.getAllocator().getName() + ":copier",
+        PriorityQueueCopier.INITIAL_ALLOCATION, PriorityQueueCopier.MAX_ALLOCATION);
     FragmentHandle handle = context.getHandle();
     fileName = String.format("%s/major_fragment_%s/minor_fragment_%s/operator_%s", QueryIdHelper.getQueryId(handle.getQueryId()),
         handle.getMajorFragmentId(), handle.getMinorFragmentId(), popConfig.getOperatorId());
@@ -211,18 +211,26 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       if (sv4 != null) {
         sv4.clear();
       }
-      if (copier != null) {
-        copier.close();
-      }
-      if (copierAllocator != null) {
+
+      try {
+        if (copier != null) {
+          copier.close();
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      } finally {
         copierAllocator.close();
-      }
-      super.close();
+        super.close();
+
+        if (mSorter != null) {
+          mSorter.clear();
+        }
 
-      if(mSorter != null) {
-        mSorter.clear();
       }
+
     }
+
+
   }
 
   @Override
@@ -339,9 +347,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
           SelectionVector2 sv2;
           if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) {
             sv2 = incoming.getSelectionVector2().clone();
-            if (sv2.getBuffer(false).isRootBuffer()) {
-              oContext.getAllocator().takeOwnership(sv2.getBuffer(false));
-            }
           } else {
             try {
               sv2 = newSV2();
@@ -473,10 +478,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         batchGroups.addAll(spilledBatchGroups);
         spilledBatchGroups = null; // no need to cleanup spilledBatchGroups, all it's batches are in batchGroups now
 
-        // copierAllocator is no longer needed now. Closing it will free memory for this operator
-        copierAllocator.close();
-        copierAllocator = null;
-
         logger.warn("Starting to merge. {} batch groups. Current allocated memory: {}", batchGroups.size(), oContext.getAllocator().getAllocatedMemory());
         VectorContainer hyperBatch = constructHyperBatch(batchGroups);
         createCopier(hyperBatch, batchGroups, container, false);
@@ -587,7 +588,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     } finally {
       hyperBatch.clear();
     }
-    takeOwnership(c1); // transfer ownership from copier allocator to external sort allocator
     long bufSize = getBufferSize(c1);
     totalSizeInMemory += bufSize;
     logger.debug("mergeAndSpill: final total size in memory = {}", totalSizeInMemory);
@@ -595,25 +595,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     return newGroup;
   }
 
-  private void takeOwnership(VectorAccessible batch) {
-    for (VectorWrapper<?> w : batch) {
-      DrillBuf[] bufs = w.getValueVector().getBuffers(false);
-      for (DrillBuf buf : bufs) {
-        if (buf.isRootBuffer()) {
-          oContext.getAllocator().takeOwnership(buf);
-        }
-      }
-    }
-  }
-
   private long getBufferSize(VectorAccessible batch) {
     long size = 0;
     for (VectorWrapper<?> w : batch) {
       DrillBuf[] bufs = w.getValueVector().getBuffers(false);
       for (DrillBuf buf : bufs) {
-        if (buf.isRootBuffer()) {
-          size += buf.capacity();
-        }
+        size += buf.getPossibleMemoryConsumed();
       }
     }
     return size;

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
index e0d9c2d..ec590c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.drill.exec.compile.TemplateClassDefinition;
@@ -38,5 +39,5 @@ public interface PriorityQueueCopier extends AutoCloseable {
       new TemplateClassDefinition<>(PriorityQueueCopier.class, PriorityQueueCopierTemplate.class);
 
   @Override
-  abstract public void close(); // specify this to leave out the Exception
+  abstract public void close() throws IOException; // specify this to leave out the Exception
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
index 891907a..5090b33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.xsort;
 
 import io.netty.buffer.DrillBuf;
 
+import java.io.IOException;
 import java.util.List;
 
 import javax.inject.Named;
@@ -66,7 +67,6 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
     allocateVectors(targetRecordCount);
     for (int outgoingIndex = 0; outgoingIndex < targetRecordCount; outgoingIndex++) {
       if (queueSize == 0) {
-        close();
         return 0;
       }
       int compoundIndex = vector4.get(0);
@@ -96,7 +96,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
   }
 
   @Override
-  public void close() {
+  public void close() throws IOException {
     vector4.clear();
     for (final VectorWrapper<?> w: outgoing) {
       w.getValueVector().clear();
@@ -104,6 +104,10 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
     for (final VectorWrapper<?> w : hyperBatch) {
       w.clear();
     }
+
+    for (BatchGroup batchGroup : batchGroups) {
+      batchGroup.close();
+    }
   }
 
   private void siftUp() {

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 229fe65..998665c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -46,11 +46,11 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   protected BatchState state;
 
   protected AbstractRecordBatch(final T popConfig, final FragmentContext context) throws OutOfMemoryException {
-    this(popConfig, context, true, context.newOperatorContext(popConfig, true));
+    this(popConfig, context, true, context.newOperatorContext(popConfig));
   }
 
   protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema) throws OutOfMemoryException {
-    this(popConfig, context, buildSchema, context.newOperatorContext(popConfig, true));
+    this(popConfig, context, buildSchema, context.newOperatorContext(popConfig));
   }
 
   protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema,

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index ed86358..13164b9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -32,13 +32,12 @@ import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 
 /**
  * Holds record batch loaded from record batch message.
@@ -120,7 +119,6 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
         newVectors.add(vector);
       }
 
-      Preconditions.checkArgument(buf == null || bufOffset == buf.capacity());
 
       // rebuild the schema.
       final SchemaBuilder builder = BatchSchema.newBuilder();

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 488ba69..d39ce5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -21,6 +21,7 @@ import io.netty.buffer.DrillBuf;
 
 import java.util.List;
 
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -58,7 +59,7 @@ public class WritableBatch implements AutoCloseable {
     return buffers;
   }
 
-  public void reconstructContainer(VectorContainer container) {
+  public void reconstructContainer(BufferAllocator allocator, VectorContainer container) {
     Preconditions.checkState(!cleared,
         "Attempted to reconstruct a container from a WritableBatch after it had been cleared");
     if (buffers.length > 0) { /* If we have DrillBuf's associated with value vectors */
@@ -67,7 +68,7 @@ public class WritableBatch implements AutoCloseable {
         len += b.capacity();
       }
 
-      DrillBuf newBuf = buffers[0].getAllocator().buffer(len);
+      DrillBuf newBuf = allocator.buffer(len);
       try {
         /* Copy data from each buffer into the compound buffer */
         int offset = 0;

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
index 915509f..cc238f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
@@ -52,7 +52,7 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
   public ControlClient(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localEndpoint, ControlMessageHandler handler,
       BootStrapContext context, ControlConnectionManager.CloseHandlerCreator closeHandlerFactory) {
     super(ControlRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
-        context.getAllocator().getUnderlyingAllocator(),
+        context.getAllocator().getAsByteBufAllocator(),
         context.getBitLoopGroup(),
         RpcType.HANDSHAKE,
         BitControlHandshake.class,

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
index 81c886a..a786469 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
@@ -46,7 +46,7 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
   public ControlServer(ControlMessageHandler handler, BootStrapContext context, ConnectionManagerRegistry connectionRegistry) {
     super(
         ControlRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
-        context.getAllocator().getUnderlyingAllocator(),
+        context.getAllocator().getAsByteBufAllocator(),
         context.getBitLoopGroup());
     this.handler = handler;
     this.connectionRegistry = connectionRegistry;

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
index d7611b9..9db551b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
@@ -48,7 +48,7 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl
   public DataClient(DrillbitEndpoint remoteEndpoint, BootStrapContext context, DataConnectionManager.CloseHandlerCreator closeHandlerFactory) {
     super(
         DataRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
-        context.getAllocator().getUnderlyingAllocator(),
+        context.getAllocator().getAsByteBufAllocator(),
         context.getBitClientLoopGroup(),
         RpcType.HANDSHAKE,
         BitServerHandshake.class,

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
index 5874b31..b2a51ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
@@ -21,9 +21,11 @@ import java.io.Closeable;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
 
 import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
@@ -37,20 +39,23 @@ public class DataConnectionCreator implements Closeable {
   private volatile DataServer server;
   private final BootStrapContext context;
   private final WorkEventBus workBus;
-  private final DataResponseHandler dataHandler;
+  private final WorkerBee bee;
   private final boolean allowPortHunting;
   private ConcurrentMap<DrillbitEndpoint, DataConnectionManager> connectionManager = Maps.newConcurrentMap();
+  private final BufferAllocator dataAllocator;
 
-  public DataConnectionCreator(BootStrapContext context, WorkEventBus workBus, DataResponseHandler dataHandler, boolean allowPortHunting) {
+  public DataConnectionCreator(BootStrapContext context, WorkEventBus workBus, WorkerBee bee, boolean allowPortHunting) {
     super();
     this.context = context;
     this.workBus = workBus;
-    this.dataHandler = dataHandler;
+    this.bee = bee;
     this.allowPortHunting = allowPortHunting;
+    this.dataAllocator = context.getAllocator()
+        .newChildAllocator("rpc-data", 0, Long.MAX_VALUE);
   }
 
   public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException {
-    server = new DataServer(context, workBus, dataHandler);
+    server = new DataServer(context, dataAllocator, workBus, bee);
     int port = server.bind(partialEndpoint.getControlPort() + 1, allowPortHunting);
     DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setDataPort(port).build();
     return completeEndpoint;
@@ -68,6 +73,7 @@ public class DataConnectionCreator implements Closeable {
   @Override
   public void close() {
     Closeables.closeQuietly(server);
+    dataAllocator.close();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
deleted file mode 100644
index 39dcc90..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc.data;
-
-import io.netty.buffer.DrillBuf;
-
-import java.io.IOException;
-
-import org.apache.drill.exec.exception.FragmentSetupException;
-import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
-import org.apache.drill.exec.work.fragment.FragmentManager;
-
-public interface DataResponseHandler {
-
-  public void handle(FragmentManager manager, FragmentRecordBatch fragmentBatch,
-      DrillBuf data, AckSender sender) throws FragmentSetupException, IOException;
-
-  public void informOutOfMemory();
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
deleted file mode 100644
index e18b94c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc.data;
-
-import io.netty.buffer.DrillBuf;
-
-import java.io.IOException;
-
-import org.apache.drill.exec.exception.FragmentSetupException;
-import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
-import org.apache.drill.exec.record.RawFragmentBatch;
-import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.apache.drill.exec.work.fragment.FragmentManager;
-
-public class DataResponseHandlerImpl implements DataResponseHandler{
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataResponseHandlerImpl.class);
-  private final WorkerBee bee;
-
-  public DataResponseHandlerImpl(final WorkerBee bee) {
-    this.bee = bee;
-  }
-
-  @Override
-  public void informOutOfMemory() {
-    logger.error("Out of memory outside any particular fragment.");
-  }
-
-  @Override
-  public void handle(final FragmentManager manager, final FragmentRecordBatch fragmentBatch,
-      final DrillBuf data, final AckSender sender) throws FragmentSetupException, IOException {
-//      logger.debug("Fragment Batch received {}", fragmentBatch);
-
-    final boolean canRun = manager.handle(new RawFragmentBatch(fragmentBatch, data, sender));
-    if (canRun) {
-//    logger.debug("Arriving batch means local batch can run, starting local batch.");
-      /*
-       * If we've reached the canRun threshold, we'll proceed. This expects handler.handle() to
-       * only return a single true.
-       */
-      bee.startFragmentPendingRemote(manager);
-    }
-  }
-}