You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/06 23:42:07 UTC

[07/15] git commit: DRILL-620: Memory consumption fixes

DRILL-620: Memory consumption fixes

accounting fixes

trim buffers

switch to using setSafe and copySafe methods only

adaptive allocation

operator based allocator wip

handle OOM

Operator Context


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/a2355d42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/a2355d42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/a2355d42

Branch: refs/heads/master
Commit: a2355d42dbff51b858fc28540915cf793f1c0fac
Parents: 70dddc5
Author: Steven Phillips <sp...@maprtech.com>
Authored: Fri Apr 11 18:41:03 2014 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Sun May 4 18:46:37 2014 -0700

----------------------------------------------------------------------
 .../exec/store/hbase/HBaseRecordReader.java     | 12 ++-
 .../exec/store/hbase/HBaseScanBatchCreator.java |  2 +-
 .../java/io/netty/buffer/PooledByteBufL.java    |  4 +
 .../src/main/codegen/includes/vv_imports.ftl    |  1 +
 .../codegen/templates/FixedValueVectors.java    | 98 ++++++++++++++++----
 .../codegen/templates/NullableValueVectors.java | 69 ++++++++++++--
 .../codegen/templates/RepeatedValueVectors.java | 44 ++++++++-
 .../src/main/codegen/templates/TypeHelper.java  |  4 +-
 .../templates/VariableLengthVectors.java        | 98 ++++++++++++++++----
 .../org/apache/drill/exec/ExecConstants.java    |  3 +-
 .../cache/VectorAccessibleSerializable.java     |  3 +-
 .../apache/drill/exec/client/DrillClient.java   |  3 +
 .../org/apache/drill/exec/memory/Accountor.java | 17 +++-
 .../drill/exec/memory/AtomicRemainder.java      | 43 +++++++--
 .../drill/exec/memory/TopLevelAllocator.java    | 54 +++++++++--
 .../apache/drill/exec/ops/FragmentContext.java  |  7 ++
 .../apache/drill/exec/ops/OperatorContext.java  | 60 ++++++++++++
 .../drill/exec/physical/base/AbstractBase.java  | 12 +++
 .../exec/physical/base/AbstractGroupScan.java   | 15 ++-
 .../exec/physical/base/AbstractSubScan.java     |  3 +-
 .../exec/physical/base/PhysicalOperator.java    | 10 ++
 .../drill/exec/physical/impl/BatchCreator.java  |  1 +
 .../drill/exec/physical/impl/ScanBatch.java     | 29 +++---
 .../drill/exec/physical/impl/ScreenCreator.java | 11 +--
 .../exec/physical/impl/SingleSenderCreator.java |  2 +-
 .../exec/physical/impl/TopN/PriorityQueue.java  |  3 +-
 .../impl/TopN/PriorityQueueTemplate.java        | 10 +-
 .../exec/physical/impl/TopN/TopNBatch.java      | 22 +++--
 .../exec/physical/impl/WireRecordBatch.java     | 32 +++----
 .../physical/impl/aggregate/HashAggBatch.java   | 10 +-
 .../impl/aggregate/HashAggTemplate.java         | 12 ++-
 .../physical/impl/aggregate/HashAggregator.java |  3 +-
 .../impl/aggregate/StreamingAggBatch.java       | 15 ++-
 .../BroadcastSenderRootExec.java                |  2 +-
 .../physical/impl/common/ChainedHashTable.java  |  8 +-
 .../exec/physical/impl/common/HashTable.java    |  3 +-
 .../physical/impl/common/HashTableTemplate.java | 36 +++----
 .../impl/filter/FilterBatchCreator.java         |  1 +
 .../physical/impl/filter/FilterRecordBatch.java | 20 ++--
 .../exec/physical/impl/join/HashJoinBatch.java  | 37 +++++---
 .../exec/physical/impl/join/HashJoinHelper.java |  7 +-
 .../exec/physical/impl/join/HashJoinProbe.java  |  4 +-
 .../impl/join/HashJoinProbeTemplate.java        | 16 ++--
 .../exec/physical/impl/join/MergeJoinBatch.java | 17 ++--
 .../impl/join/MergeJoinBatchBuilder.java        |  5 +-
 .../physical/impl/limit/LimitRecordBatch.java   | 10 +-
 .../MergingReceiverGeneratorBase.java           |  2 +-
 .../mergereceiver/MergingReceiverTemplate.java  |  2 +-
 .../impl/mergereceiver/MergingRecordBatch.java  | 62 ++++++++-----
 .../OrderedPartitionProjectorTemplate.java      |  4 +-
 .../OrderedPartitionRecordBatch.java            | 75 ++++++++++-----
 .../impl/orderedpartitioner/SampleCopier.java   |  2 +-
 .../SampleCopierTemplate.java                   |  9 +-
 .../partitionsender/OutgoingRecordBatch.java    | 10 +-
 .../PartitionSenderRootExec.java                | 19 +++-
 .../impl/project/ProjectRecordBatch.java        | 89 +++++++++++++++---
 .../exec/physical/impl/project/Projector.java   |  2 +-
 .../impl/project/ProjectorTemplate.java         | 17 +++-
 .../exec/physical/impl/sort/SortBatch.java      | 10 +-
 .../exec/physical/impl/svremover/Copier.java    |  2 +-
 .../impl/svremover/CopierTemplate2.java         | 16 ++--
 .../impl/svremover/CopierTemplate4.java         | 17 ++--
 .../impl/svremover/RemovingRecordBatch.java     | 92 +++++++++++++++---
 .../physical/impl/trace/TraceRecordBatch.java   |  6 +-
 .../physical/impl/union/UnionRecordBatch.java   | 13 +--
 .../exec/physical/impl/xsort/BatchGroup.java    |  2 +-
 .../physical/impl/xsort/ExternalSortBatch.java  | 66 +++++++++----
 .../exec/physical/impl/xsort/MSortTemplate.java | 11 ++-
 .../drill/exec/physical/impl/xsort/MSorter.java |  3 +-
 .../impl/xsort/PriorityQueueCopier.java         |  3 +-
 .../impl/xsort/PriorityQueueCopierTemplate.java | 17 +++-
 .../impl/xsort/PriorityQueueSelector.java       |  3 +-
 .../xsort/PriorityQueueSelectorTemplate.java    |  7 +-
 .../impl/xsort/SingleBatchSorterTemplate.java   |  5 +
 .../exec/planner/fragment/Materializer.java     |  9 ++
 .../planner/fragment/SimpleParallelizer.java    |  4 +-
 .../exec/planner/fragment/StatsCollector.java   |  8 +-
 .../drill/exec/planner/fragment/Wrapper.java    | 15 +++
 .../exec/planner/logical/DrillRuleSets.java     | 14 +--
 .../drill/exec/planner/physical/SortPrel.java   |  8 +-
 .../drill/exec/record/AbstractRecordBatch.java  | 14 ++-
 .../exec/record/AbstractSingleRecordBatch.java  | 16 +++-
 .../exec/record/RawFragmentBatchProvider.java   |  2 +-
 .../apache/drill/exec/record/RecordBatch.java   |  3 +-
 .../apache/drill/exec/record/TransferPair.java  |  1 +
 .../apache/drill/exec/record/WritableBatch.java |  6 +-
 .../exec/record/selection/SelectionVector2.java | 10 +-
 .../exec/rpc/BasicClientWithConnection.java     |  2 +-
 .../org/apache/drill/exec/rpc/BasicServer.java  |  8 +-
 .../drill/exec/rpc/InboundRpcMessage.java       |  2 +-
 .../drill/exec/rpc/OutOfMemoryHandler.java      | 31 +++++++
 .../drill/exec/rpc/ProtobufLengthDecoder.java   | 14 ++-
 .../java/org/apache/drill/exec/rpc/RpcBus.java  |  3 +-
 .../drill/exec/rpc/control/ControlClient.java   |  9 +-
 .../control/ControlProtobufLengthDecoder.java   |  5 +-
 .../drill/exec/rpc/control/ControlServer.java   |  9 +-
 .../exec/rpc/data/BitServerConnection.java      | 83 +++++++++++++++--
 .../apache/drill/exec/rpc/data/DataClient.java  |  8 +-
 .../rpc/data/DataProtobufLengthDecoder.java     |  5 +-
 .../apache/drill/exec/rpc/data/DataServer.java  | 33 +++++--
 .../drill/exec/rpc/user/QueryResultHandler.java | 11 ++-
 .../apache/drill/exec/rpc/user/UserClient.java  |  8 +-
 .../rpc/user/UserProtobufLengthDecoder.java     |  5 +-
 .../apache/drill/exec/rpc/user/UserServer.java  | 12 +--
 .../apache/drill/exec/store/RecordReader.java   |  3 +
 .../apache/drill/exec/store/VectorHolder.java   | 11 +++
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |  2 +-
 .../exec/store/direct/DirectBatchCreator.java   |  2 +-
 .../exec/store/easy/json/JSONRecordReader.java  | 21 ++---
 .../drill/exec/store/hive/HiveRecordReader.java | 23 +++--
 .../exec/store/hive/HiveScanBatchCreator.java   |  2 +-
 .../exec/store/hive/HiveTextRecordReader.java   |  6 +-
 .../store/ischema/InfoSchemaBatchCreator.java   |  2 +-
 .../exec/store/ischema/RowRecordReader.java     | 14 ++-
 .../drill/exec/store/mock/MockRecordReader.java | 16 ++--
 .../exec/store/mock/MockScanBatchCreator.java   |  2 +-
 .../exec/store/parquet/NullableBitReader.java   |  6 +-
 .../exec/store/parquet/ParquetRecordReader.java | 20 +---
 .../store/parquet/ParquetScanBatchCreator.java  |  2 +-
 .../drill/exec/vector/AllocationHelper.java     |  2 +-
 .../org/apache/drill/exec/vector/BitVector.java | 67 ++++++++++++-
 .../apache/drill/exec/vector/ValueVector.java   |  5 +
 .../org/apache/drill/exec/work/WorkManager.java | 13 +--
 .../exec/work/batch/AbstractDataCollector.java  |  5 +
 .../drill/exec/work/batch/IncomingBuffers.java  | 10 ++
 .../exec/work/batch/SpoolingRawBatchBuffer.java | 65 +++++++++++--
 .../work/batch/UnlimitedRawBatchBuffer.java     |  5 +
 .../exec/work/fragment/FragmentManager.java     |  9 ++
 .../work/fragment/NonRootFragmentManager.java   | 19 +++-
 .../exec/work/fragment/RootFragmentManager.java | 21 ++++-
 .../src/main/resources/drill-module.conf        | 11 +++
 .../drill/exec/cache/TestVectorCache.java       |  8 +-
 .../drill/exec/cache/TestWriteToDisk.java       |  8 +-
 .../exec/fn/impl/TestRepeatedFunction.java      |  1 +
 .../exec/physical/impl/SimpleRootExec.java      |  2 +-
 .../exec/physical/impl/TestCastFunctions.java   |  8 ++
 .../physical/impl/TestComparisonFunctions.java  |  5 +
 .../physical/impl/TestConvertFunctions.java     |  4 +
 .../drill/exec/physical/impl/TestDecimal.java   |  5 +-
 .../drill/exec/physical/impl/agg/TestAgg.java   |  1 +
 .../physical/impl/filter/TestSimpleFilter.java  |  5 +
 .../exec/physical/impl/join/TestHashJoin.java   |  5 +-
 .../impl/project/TestSimpleProjection.java      | 15 +--
 .../impl/trace/TestTraceMultiRecordBatch.java   |  2 +
 .../impl/trace/TestTraceOutputDump.java         |  2 +
 .../impl/xsort/TestSimpleExternalSort.java      | 51 +++++++++-
 .../apache/drill/exec/pop/PopUnitTestBase.java  |  1 +
 .../exec/record/vector/TestValueVector.java     | 10 +-
 .../exec/store/ischema/TestOrphanSchema.java    | 15 +--
 .../exec/store/ischema/TestTableProvider.java   | 12 ++-
 .../exec/store/json/JSONRecordReaderTest.java   | 90 ++++++------------
 .../exec/vector/TestAdaptiveAllocation.java     | 71 ++++++++++++++
 .../drill/exec/vector/TestSplitAndTransfer.java | 75 +++++++++++++++
 .../src/test/resources/drill-oom-xsort.conf     | 18 ++++
 .../src/test/resources/project/test1.json       |  9 +-
 .../src/test/resources/xsort/oom_sort_test.json | 57 ++++++++++++
 pom.xml                                         |  4 +-
 .../org/apache/drill/exec/proto/BitData.java    | 94 +++++++++++++++++--
 protocol/src/main/protobuf/BitData.proto        |  1 +
 159 files changed, 2083 insertions(+), 662 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 3694b53..946ee40 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -31,6 +31,8 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
@@ -65,17 +67,19 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
   private Scan scan;
   private ResultScanner resultScanner;
   private FragmentContext context;
+  private BufferAllocator allocator;
   Map<FamilyQualifierWrapper, NullableVarBinaryVector> vvMap;
   private Result leftOver;
   private VarBinaryVector rowKeyVector;
   private SchemaPath rowKeySchemaPath;
   private HTable table;
 
-  public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec e, List<SchemaPath> columns, FragmentContext context) {
+  public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec e, List<SchemaPath> columns, FragmentContext context) throws OutOfMemoryException {
     this.columns = columns;
     this.scan = new Scan(e.getStartRow(), e.getStopRow());
     this.scan.setFilter(e.getScanFilter());
     this.context = context;
+    this.allocator = context.getNewChildAllocator(ALLOCATOR_INITIAL_RESERVATION, ALLOCATOR_MAX_RESERVATION);
     if (columns != null && columns.size() != 0) {
       for (SchemaPath column : columns) {
         if (column.getRootSegment().getPath().toString().equalsIgnoreCase(ROW_KEY)) {
@@ -125,11 +129,11 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
       try {
         if (column.equals(rowKeySchemaPath)) {
           MaterializedField field = MaterializedField.create(column, Types.required(TypeProtos.MinorType.VARBINARY));
-          rowKeyVector = new VarBinaryVector(field, context.getAllocator());
+          rowKeyVector = new VarBinaryVector(field, allocator);
           output.addField(rowKeyVector);
         } else if (column.getRootSegment().getChild() != null){
           MaterializedField field = MaterializedField.create(column, Types.optional(TypeProtos.MinorType.VARBINARY));
-          NullableVarBinaryVector v = new NullableVarBinaryVector(field, context.getAllocator());
+          NullableVarBinaryVector v = new NullableVarBinaryVector(field, allocator);
           output.addField(v);
           String fullyQualified = column.getRootSegment().getPath() + "." + column.getRootSegment().getChild().getNameSegment().getPath();
           vvMap.put(new FamilyQualifierWrapper(fullyQualified), v);
@@ -213,7 +217,7 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
   @SuppressWarnings("deprecation")
   private NullableVarBinaryVector addNewVector(String column) {
     MaterializedField field = MaterializedField.create(SchemaPath.getCompoundPath(column.split("\\.")), Types.optional(TypeProtos.MinorType.VARBINARY));
-    NullableVarBinaryVector v = new NullableVarBinaryVector(field, context.getAllocator());
+    NullableVarBinaryVector v = new NullableVarBinaryVector(field, allocator);
     VectorAllocator.getAllocator(v, 100).alloc(TARGET_RECORD_COUNT);
     vvMap.put(new FamilyQualifierWrapper(column), v);
     try {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
index 7e38f5f..0a4eabe 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
@@ -47,6 +47,6 @@ public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan>{
         throw new ExecutionSetupException(e1);
       }
     }
-    return new ScanBatch(context, readers.iterator());
+    return new ScanBatch(subScan, context, readers.iterator());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java
----------------------------------------------------------------------
diff --git a/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java b/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java
index 294f618..3f871e5 100644
--- a/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java
+++ b/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java
@@ -106,6 +106,10 @@ abstract class PooledByteBufL<T> extends AbstractReferenceCountedByteBuf {
     public final ByteBuf capacity(int newCapacity) {
         ensureAccessible();
 
+        if (chunk.parent == null) {
+          return this; //TODO figure out if this is the correct behavior
+        }
+
         // Check for the easy resizing cases, and return if successfully resized.
         if (chunk.unpooled) {
             if (newCapacity == length) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/includes/vv_imports.ftl b/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
index bca6e3c..49c9ca2 100644
--- a/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
+++ b/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
@@ -22,6 +22,7 @@ import io.netty.buffer.ByteBufInputStream;
 
 import org.apache.commons.lang3.ArrayUtils;
 
+import org.apache.drill.exec.memory.AccountingByteBuf;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.SchemaDefProtos;
 import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
index d13fcdb..781b5a1 100644
--- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
@@ -49,6 +49,9 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
 
   private final Accessor accessor = new Accessor();
   private final Mutator mutator = new Mutator();
+
+  private int allocationValueCount = 4000;
+  private int allocationMonitor = 0;
   
   public ${minor.class}Vector(MaterializedField field, BufferAllocator allocator) {
     super(field, allocator);
@@ -65,12 +68,24 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
   public Mutator getMutator(){
     return mutator;
   }
-  
-  
 
-  /**
-   * Allocate a new buffer that supports setting at least the provided number of values.  May actually be sized bigger depending on underlying buffer rounding size. Must be called prior to using the ValueVector.
-   * @param valueCount
+
+  public void allocateNew() {
+    clear();
+    if (allocationMonitor > 5) {
+      allocationValueCount = Math.max(2, (int) (allocationValueCount * 0.9));
+      allocationMonitor = 0;
+    } else if (allocationMonitor < -5) {
+      allocationValueCount = (int) (allocationValueCount * 1.1);
+      allocationMonitor = 0;
+    }
+    this.data = allocator.buffer(allocationValueCount * ${type.width});
+    this.data.readerIndex(0);
+  }
+
+    /**
+     * Allocate a new buffer that supports setting at least the provided number of values.  May actually be sized bigger depending on underlying buffer rounding size. Must be called prior to using the ValueVector.
+     * @param valueCount
    */
   public void allocateNew(int valueCount) {
     clear();
@@ -121,6 +136,14 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     target.valueCount = valueCount;
     clear();
   }
+
+  public void splitAndTransferTo(int startIndex, int length, ${minor.class}Vector target) {
+    int currentWriterIndex = data.writerIndex();
+    int startPoint = startIndex * ${type.width};
+    int sliceLength = length * ${type.width};
+    target.data = this.data.slice(startPoint, sliceLength);
+    target.data.retain();
+  }
   
   private class TransferImpl implements TransferPair{
     ${minor.class}Vector to;
@@ -140,6 +163,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     public void transfer(){
       transferTo(to);
     }
+
+    public void splitAndTransfer(int startIndex, int length) {
+      splitAndTransferTo(startIndex, length, to);
+    }
     
     @Override
     public void copyValue(int fromIndex, int toIndex) {
@@ -147,9 +174,9 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     }
   }
   
-  public void copyFrom(int fromIndex, int thisIndex, ${minor.class}Vector from){
+  protected void copyFrom(int fromIndex, int thisIndex, ${minor.class}Vector from){
     <#if (type.width > 8)>
-    data.getBytes(fromIndex * ${type.width}, from.data, thisIndex * ${type.width}, ${type.width});
+    from.data.getBytes(fromIndex * ${type.width}, data, thisIndex * ${type.width}, ${type.width});
     <#else> <#-- type.width <= 8 -->
     data.set${(minor.javaType!type.javaType)?cap_first}(thisIndex * ${type.width}, 
         from.data.get${(minor.javaType!type.javaType)?cap_first}(fromIndex * ${type.width})
@@ -477,14 +504,23 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
      data.setBytes(index * ${type.width}, value, 0, ${type.width});
    }
 
+   public boolean setSafe(int index, <#if (type.width > 4)>${minor.javaType!type.javaType}<#else>int</#if> value) {
+     if(index >= getValueCapacity()) {
+       allocationMonitor--;
+       return false;
+     }
+     data.setBytes(index * ${type.width}, value, 0, ${type.width});
+     return true;
+   }
+
    <#if (minor.class == "TimeStampTZ")>
-   public void set(int index, ${minor.class}Holder holder){
+   protected void set(int index, ${minor.class}Holder holder){
      data.setLong((index * ${type.width}), holder.value);
      data.setInt(((index * ${type.width}) + ${minor.milliSecondsSize}), holder.index);
 
    }
 
-   void set(int index, Nullable${minor.class}Holder holder){
+   protected void set(int index, Nullable${minor.class}Holder holder){
      data.setLong((index * ${type.width}), holder.value);
      data.setInt(((index * ${type.width}) + ${minor.milliSecondsSize}), holder.index);
    }
@@ -501,14 +537,14 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
      return true;
    }
    <#elseif (minor.class == "Interval")>
-   public void set(int index, ${minor.class}Holder holder){
+   protected void set(int index, ${minor.class}Holder holder){
      int offsetIndex = index * ${type.width};
      data.setInt(offsetIndex, holder.months);
      data.setInt((offsetIndex + ${minor.daysOffset}), holder.days);
      data.setInt((offsetIndex + ${minor.milliSecondsOffset}), holder.milliSeconds);
    }
 
-   void set(int index, Nullable${minor.class}Holder holder){
+   protected void set(int index, Nullable${minor.class}Holder holder){
      int offsetIndex = index * ${type.width};
      data.setInt(offsetIndex, holder.months);
      data.setInt((offsetIndex + ${minor.daysOffset}), holder.days);
@@ -527,13 +563,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
      return true;
    }
    <#elseif (minor.class == "IntervalDay")>
-   public void set(int index, ${minor.class}Holder holder){
+   protected void set(int index, ${minor.class}Holder holder){
      int offsetIndex = index * ${type.width};
      data.setInt(offsetIndex, holder.days);
      data.setInt((offsetIndex + ${minor.milliSecondsOffset}), holder.milliSeconds);
    }
 
-   void set(int index, Nullable${minor.class}Holder holder){
+   protected void set(int index, Nullable${minor.class}Holder holder){
      int offsetIndex = index * ${type.width};
      data.setInt(offsetIndex, holder.days);
      data.setInt((offsetIndex + ${minor.milliSecondsOffset}), holder.milliSeconds);
@@ -595,7 +631,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
    }
 
    <#else>
-   public void set(int index, ${minor.class}Holder holder){
+   protected void set(int index, ${minor.class}Holder holder){
      data.setBytes(index * ${type.width}, holder.buffer, holder.start, ${type.width});
    }
    
@@ -605,7 +641,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
      return true;
    }
 
-   void set(int index, Nullable${minor.class}Holder holder){
+   protected void set(int index, Nullable${minor.class}Holder holder){
      data.setBytes(index * ${type.width}, holder.buffer, holder.start, ${type.width});
    }
    </#if>
@@ -628,19 +664,34 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
    }
    
    public boolean setSafe(int index, <#if (type.width >= 4)>${minor.javaType!type.javaType}<#else>int</#if> value) {
-     if(index >= getValueCapacity()) return false;
+     if(index >= getValueCapacity()) {
+       allocationMonitor--;
+       return false;
+     }
      set(index, value);
      return true;
    }
 
-   public void set(int index, ${minor.class}Holder holder){
+   protected void set(int index, ${minor.class}Holder holder){
      data.set${(minor.javaType!type.javaType)?cap_first}(index * ${type.width}, holder.value);
    }
 
-   void set(int index, Nullable${minor.class}Holder holder){
+   public boolean setSafe(int index, ${minor.class}Holder holder){
+     if(index >= getValueCapacity()) return false;
+     set(index, holder);
+     return true;
+   }
+
+   protected void set(int index, Nullable${minor.class}Holder holder){
      data.set${(minor.javaType!type.javaType)?cap_first}(index * ${type.width}, holder.value);
    }
 
+   public boolean setSafe(int index, Nullable${minor.class}Holder holder){
+     if(index >= getValueCapacity()) return false;
+     set(index, holder);
+     return true;
+   }
+
    @Override
    public void generateTestData(int size) {
      setValueCount(size);
@@ -659,8 +710,17 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
 
   
    public void setValueCount(int valueCount) {
+     int currentValueCapacity = getValueCapacity();
      ${minor.class}Vector.this.valueCount = valueCount;
-     data.writerIndex(${type.width} * valueCount);
+     int idx = (${type.width} * valueCount);
+     if (((float) currentValueCapacity) / idx > 1.1) {
+       allocationMonitor++;
+     }
+     data.writerIndex(idx);
+     if (data instanceof AccountingByteBuf) {
+       data.capacity(idx);
+       data.writerIndex(idx);
+     }
    }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
index d2209c1..6839b37 100644
--- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
@@ -94,6 +94,14 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
   }
 
   @Override
+  public void allocateNew() {
+    values.allocateNew();
+    bits.allocateNew();
+    mutator.reset();
+    accessor.reset();
+  }
+
+  @Override
   public void allocateNew(int totalBytes, int valueCount) {
     values.allocateNew(totalBytes, valueCount);
     bits.allocateNew(valueCount);
@@ -135,7 +143,15 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
              .setBufferLength(getBufferSize())
              .build();
   }
-  
+
+  @Override
+  public void allocateNew() {
+    values.allocateNew();
+    bits.allocateNew();
+    mutator.reset();
+    accessor.reset();
+  }
+
   @Override
   public void allocateNew(int valueCount) {
     values.allocateNew(valueCount);
@@ -186,6 +202,14 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
     </#if>
     clear();
   }
+
+  public void splitAndTransferTo(int startIndex, int length, Nullable${minor.class}Vector target) {
+    bits.splitAndTransferTo(startIndex, length, target.bits);
+    values.splitAndTransferTo(startIndex, length, target.values);
+    <#if type.major == "VarLen">
+    target.mutator.lastSet = length - 1;
+    </#if>
+  }
   
   private class TransferImpl implements TransferPair{
     Nullable${minor.class}Vector to;
@@ -205,7 +229,11 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
     public void transfer(){
       transferTo(to);
     }
-    
+
+    public void splitAndTransfer(int startIndex, int length) {
+      splitAndTransferTo(startIndex, length, to);
+    }
+
     @Override
     public void copyValue(int fromIndex, int toIndex) {
       to.copyFrom(fromIndex, toIndex, Nullable${minor.class}Vector.this);
@@ -230,14 +258,20 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
   }
 
   
-  public void copyFrom(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
+  protected void copyFrom(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
     if (!from.getAccessor().isNull(fromIndex)) {
     mutator.set(thisIndex, from.getAccessor().get(fromIndex));
 }
   }
   
   public boolean copyFromSafe(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
-    return bits.copyFromSafe(fromIndex, thisIndex, from.bits) && values.copyFromSafe(fromIndex, thisIndex, from.values);
+    boolean success = bits.copyFromSafe(fromIndex, thisIndex, from.bits) && values.copyFromSafe(fromIndex, thisIndex, from.values);
+<#if type.major == "VarLen">
+    if (success) {
+      mutator.lastSet = thisIndex;
+    }
+</#if>
+    return success;
   }
 
   
@@ -251,7 +285,7 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
      * @throws  NullValueException if the value is null
      */
     public <#if type.major == "VarLen">byte[]<#else>${minor.javaType!type.javaType}</#if> get(int index) {
-      assert !isNull(index);
+      assert !isNull(index) : "Tried to get null value";
       return values.getAccessor().get(index);
     }
 
@@ -384,11 +418,7 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
 
     //public boolean setSafe(int index, <#if type.major == "VarLen" || minor.class == "TimeStampTZ" || minor.class == "Interval" || minor.class == "IntervalDay">Nullable${minor.class}Holder <#elseif (type.width < 4)>int<#else>${minor.javaType!type.javaType}</#if> value){
 
-    <#if type.major == "VarLen" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense" || minor.class == "TimeStampTZ" || minor.class == "Interval" || minor.class == "IntervalDay">
     public boolean setSafe(int index, Nullable${minor.class}Holder value) {
-    <#else>
-    public boolean setSafe(int index, ${minor.javaType!type.javaType} value) {
-    </#if>
 
       <#if type.major == "VarLen">
       for (int i = lastSet + 1; i < index; i++) {
@@ -407,7 +437,26 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
 
     }
 
-    
+    <#if !(type.major == "VarLen" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense" || minor.class == "TimeStampTZ" || minor.class == "Interval" || minor.class == "IntervalDay")>
+      public boolean setSafe(int index, ${minor.javaType!type.javaType} value) {
+        <#if type.major == "VarLen">
+        for (int i = lastSet + 1; i < index; i++) {
+          values.getMutator().set(i, new byte[]{});
+        }
+        </#if>
+        boolean b1 = bits.getMutator().setSafe(index, 1);
+        boolean b2 = values.getMutator().setSafe(index, value);
+        if(b1 && b2){
+          setCount++;
+          <#if type.major == "VarLen">lastSet = index;</#if>
+          return true;
+        }else{
+          return false;
+        }
+      }
+
+    </#if>
+
     public void setValueCount(int valueCount) {
       assert valueCount >= 0;
       <#if type.major == "VarLen">

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index 35bd480..7d10438 100644
--- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -15,6 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+import java.lang.Override;
+
 <@pp.dropOutputFile />
 <#list vv.types as type>
 <#list type.minor as minor>
@@ -40,6 +43,7 @@ package org.apache.drill.exec.vector;
 
   private int parentValueCount;
   private int childValueCount;
+  protected int sliceOffset = 0;
   
   private final UInt4Vector offsets;   // offsets to start of each record
   private final ${minor.class}Vector values;
@@ -83,6 +87,14 @@ package org.apache.drill.exec.vector;
     target.childValueCount = childValueCount;
     clear();
   }
+
+  public void splitAndTransferTo(int startIndex, int length, Repeated${minor.class}Vector target) {
+   int startValue = offsets.getAccessor().get(startIndex);
+    int endValue = offsets.getAccessor().get(startIndex + length);
+    values.splitAndTransferTo(startValue, endValue - startValue, target.values);
+    offsets.splitAndTransferTo(startIndex, length, target.offsets);
+    sliceOffset = startIndex;
+  }
   
   private class TransferImpl implements TransferPair{
     Repeated${minor.class}Vector to;
@@ -102,6 +114,10 @@ package org.apache.drill.exec.vector;
     public void transfer(){
       transferTo(to);
     }
+
+    public void splitAndTransfer(int startIndex, int length) {
+      splitAndTransferTo(startIndex, length, to);
+    }
     
     @Override
     public void copyValue(int fromIndex, int toIndex) {
@@ -150,6 +166,14 @@ package org.apache.drill.exec.vector;
              .setBufferLength(getBufferSize())
              .build();
   }
+
+  public void allocateNew() {
+    offsets.allocateNew();
+    values.allocateNew();
+    mutator.reset();
+    accessor.reset();
+    sliceOffset = 0;
+  }
   
   public void allocateNew(int totalBytes, int parentValueCount, int childValueCount) {
     offsets.allocateNew(parentValueCount+1);
@@ -157,6 +181,7 @@ package org.apache.drill.exec.vector;
     values.allocateNew(totalBytes, childValueCount);
     mutator.reset();
     accessor.reset();
+    sliceOffset = 0;
   }
   
   @Override
@@ -192,6 +217,15 @@ package org.apache.drill.exec.vector;
              .setBufferLength(getBufferSize())
              .build();
   }
+
+  @Override
+  public void allocateNew() {
+    clear();
+    offsets.allocateNew();
+    values.allocateNew();
+    mutator.reset();
+    accessor.reset();
+  }
   
   public void allocateNew(int parentValueCount, int childValueCount) {
     clear();
@@ -251,8 +285,8 @@ package org.apache.drill.exec.vector;
     
     public Object getObject(int index) {
       List<Object> vals = Lists.newArrayList();
-      int start = offsets.getAccessor().get(index);
-      int end = offsets.getAccessor().get(index+1);
+      int start = offsets.getAccessor().get(index) - sliceOffset;
+      int end = offsets.getAccessor().get(index+1) - sliceOffset;
       for(int i = start; i < end; i++){
         vals.add(values.getAccessor().getObject(i));
       }
@@ -270,7 +304,7 @@ package org.apache.drill.exec.vector;
     public <#if type.major == "VarLen">byte[]
            <#else>${minor.javaType!type.javaType}
            </#if> get(int index, int positionIndex) {
-      return values.getAccessor().get(offsets.getAccessor().get(index) + positionIndex);
+      return values.getAccessor().get(offsets.getAccessor().get(index) - sliceOffset + positionIndex);
     }
         
            
@@ -279,8 +313,8 @@ package org.apache.drill.exec.vector;
     }
     
     public void get(int index, Repeated${minor.class}Holder holder){
-      holder.start = offsets.getAccessor().get(index);
-      holder.end =  offsets.getAccessor().get(index+1);
+      holder.start = offsets.getAccessor().get(index) - sliceOffset;
+      holder.end =  offsets.getAccessor().get(index+1) - sliceOffset;
       holder.vector = values;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/codegen/templates/TypeHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/TypeHelper.java b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
index bd89ce7..f17df04 100644
--- a/exec/java-exec/src/main/codegen/templates/TypeHelper.java
+++ b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
@@ -66,7 +66,7 @@ public class TypeHelper {
     throw new UnsupportedOperationException();
   }
   
-  public static Class<?> getValueVectorClass(MinorType type, DataMode mode){
+  public static Class<? extends ValueVector> getValueVectorClass(MinorType type, DataMode mode){
     switch (type) {
 <#list vv.types as type>
   <#list type.minor as minor>
@@ -162,7 +162,7 @@ public class TypeHelper {
 <#list vv.types as type>
   <#list type.minor as minor>
     case ${minor.class?upper_case} :
-      ((${minor.class}Vector) vector).getMutator().set(index, (${minor.class}Holder) holder);
+      ((${minor.class}Vector) vector).getMutator().setSafe(index, (${minor.class}Holder) holder);
       break;
   </#list>
 </#list>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 9cec943..3905bce 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -45,7 +45,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
   private final UInt${type.width}Vector offsetVector;
   private final Accessor accessor = new Accessor();
   private final Mutator mutator = new Mutator();
-  
+
+  private int allocationTotalByteCount = 40000;
+  private int allocationMonitor = 0;
+
   public ${minor.class}Vector(MaterializedField field, BufferAllocator allocator) {
     super(field, allocator);
     this.offsetVector = new UInt${type.width}Vector(null, allocator);
@@ -138,8 +141,20 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     target.valueCount = valueCount;
     clear();
   }
+
+  public void splitAndTransferTo(int startIndex, int length, ${minor.class}Vector target) {
+    int startPoint = this.offsetVector.getAccessor().get(startIndex);
+    int sliceLength = this.offsetVector.getAccessor().get(startIndex + length) - startPoint;
+    target.offsetVector.clear();
+    target.offsetVector.allocateNew(length + 1);
+    for (int i = 0; i < length + 1; i++) {
+      target.offsetVector.getMutator().set(i, this.offsetVector.getAccessor().get(startIndex + i) - startPoint);
+    }
+    target.data = this.data.slice(startPoint, sliceLength);
+    target.data.retain();
+  }
   
-  public void copyFrom(int fromIndex, int thisIndex, ${minor.class}Vector from){
+  protected void copyFrom(int fromIndex, int thisIndex, ${minor.class}Vector from){
     int start = from.offsetVector.getAccessor().get(fromIndex);
     int end =   from.offsetVector.getAccessor().get(fromIndex+1);
     int len = end - start;
@@ -185,12 +200,32 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     public void transfer(){
       transferTo(to);
     }
+
+    public void splitAndTransfer(int startIndex, int length) {
+      splitAndTransferTo(startIndex, length, to);
+    }
     
     @Override
     public void copyValue(int fromIndex, int toIndex) {
       to.copyFrom(fromIndex, toIndex, ${minor.class}Vector.this);
     }
   }
+
+  @Override
+  public void allocateNew() {
+    clear();
+    if (allocationMonitor > 5) {
+      allocationTotalByteCount = Math.max(1, (int) (allocationTotalByteCount * 0.9));
+      allocationMonitor = 0;
+    } else if (allocationMonitor < -5) {
+      allocationTotalByteCount = (int) (allocationTotalByteCount * 1.1);
+      allocationMonitor = 0;
+    }
+    data = allocator.buffer(allocationTotalByteCount);
+    data.readerIndex(0);
+    offsetVector.allocateNew();
+    offsetVector.getMutator().set(0,0);
+  }
   
   public void allocateNew(int totalBytes, int valueCount) {
     clear();
@@ -285,7 +320,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
      * @param index   position of the bit to set
      * @param bytes   array of bytes to write
      */
-    public void set(int index, byte[] bytes) {
+    protected void set(int index, byte[] bytes) {
       assert index >= 0;
       int currentOffset = offsetVector.getAccessor().get(index);
       offsetVector.getMutator().set(index + 1, currentOffset + bytes.length);
@@ -295,7 +330,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     public boolean setSafe(int index, byte[] bytes) {
       assert index >= 0;
       int currentOffset = offsetVector.getAccessor().get(index);
-      if (data.capacity() < currentOffset + bytes.length) return false;
+      if (data.capacity() < currentOffset + bytes.length) {
+        allocationMonitor--;
+        return false;
+      }
+      if (!offsetVector.getMutator().setSafe(index + 1, currentOffset + bytes.length)) {
+        return false;
+      }
       offsetVector.getMutator().set(index + 1, currentOffset + bytes.length);
       data.setBytes(currentOffset, bytes, 0, bytes.length);
       return true;
@@ -309,7 +350,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
      * @param start   start index of bytes to write
      * @param length  length of bytes to write
      */
-    public void set(int index, byte[] bytes, int start, int length) {
+    protected void set(int index, byte[] bytes, int start, int length) {
       assert index >= 0;
       int currentOffset = offsetVector.getAccessor().get(index);
       offsetVector.getMutator().set(index + 1, currentOffset + length);
@@ -321,8 +362,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
 
       int currentOffset = offsetVector.getAccessor().get(index);
 
-      if (data.capacity() < currentOffset + length) return false;
-
+      if (data.capacity() < currentOffset + length) {
+        allocationMonitor--;
+        return false;
+      }
       if (!offsetVector.getMutator().setSafe(index + 1, currentOffset + length)) {
         return false;
       }
@@ -341,10 +384,17 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       
       int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
       
-      if(data.capacity() < outputStart + len) return false;
+      if(data.capacity() < outputStart + len) {
+        allocationMonitor--;
+        return false;
+      }
       
       holder.buffer.getBytes(start, data, outputStart, len);
-      offsetVector.data.set${(minor.javaType!type.javaType)?cap_first}( (index+1) * ${type.width},  outputStart + len);
+      if (!offsetVector.getMutator().setSafe( index+1,  outputStart + len)) {
+        return false;
+      }
+
+      set(index, holder);
 
       return true;
     }
@@ -358,15 +408,22 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       
       int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
       
-      if(data.capacity() < outputStart + len) return false;
+      if(data.capacity() < outputStart + len) {
+        allocationMonitor--;
+        return false;
+      }
       
       holder.buffer.getBytes(start, data, outputStart, len);
-      offsetVector.data.set${(minor.javaType!type.javaType)?cap_first}( (index+1) * ${type.width},  outputStart + len);
+      if (!offsetVector.getMutator().setSafe( index+1,  outputStart + len)) {
+        return false;
+      }
+
+      set(index, holder);
 
       return true;
     }
     
-    public void set(int index, int start, int length, ByteBuf buffer){
+    protected void set(int index, int start, int length, ByteBuf buffer){
       assert index >= 0;
       int currentOffset = offsetVector.getAccessor().get(index);
       offsetVector.getMutator().set(index + 1, currentOffset + length);
@@ -374,14 +431,14 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       data.setBytes(currentOffset, bb);
     }
 
-    void set(int index, Nullable${minor.class}Holder holder){
+    protected void set(int index, Nullable${minor.class}Holder holder){
       int length = holder.end - holder.start;
       int currentOffset = offsetVector.getAccessor().get(index);
       offsetVector.getMutator().set(index + 1, currentOffset + length);
       data.setBytes(currentOffset, holder.buffer, holder.start, length);
     }
     
-    public void set(int index, ${minor.class}Holder holder){
+    protected void set(int index, ${minor.class}Holder holder){
       int length = holder.end - holder.start;
       int currentOffset = offsetVector.getAccessor().get(index);
       offsetVector.getMutator().set(index + 1, currentOffset + length);
@@ -389,8 +446,17 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     }
     
     public void setValueCount(int valueCount) {
+      int currentByteCapacity = getByteCapacity();
       ${minor.class}Vector.this.valueCount = valueCount;
-      data.writerIndex(offsetVector.getAccessor().get(valueCount));
+      int idx = offsetVector.getAccessor().get(valueCount);
+      data.writerIndex(idx);
+      if (((float) currentByteCapacity) / idx > 1.1) {
+        allocationMonitor++;
+      }
+      if (data instanceof AccountingByteBuf) {
+        data.capacity(idx);
+        data.writerIndex(idx);
+      }
       offsetVector.getMutator().setValueCount(valueCount+1);
     }
 
@@ -413,4 +479,4 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
 
 </#if> <#-- type.major -->
 </#list>
-</#list>
\ No newline at end of file
+</#list>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index baef9b0..9eee08d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -62,5 +62,6 @@ public interface ExecConstants {
   public static final String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size";
   public static final String FILESYSTEM_PARTITION_COLUMN_LABEL = "drill.exec.storage.file.partition.column.label";
   public static final String HAZELCAST_SUBNETS = "drill.exec.cache.hazel.subnets";
-  
+  public static final String TOP_LEVEL_MAX_ALLOC = "drill.exec.memory.top.max";
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 9511992..f4a6d7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -157,7 +157,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
     if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)
     {
       svCount = sv2.getCount();
-      svBuf = sv2.getBuffer();
+      svBuf = sv2.getBuffer(); //this calls retain() internally
     }
 
     try
@@ -170,6 +170,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
       {
         svBuf.getBytes(0, output, svBuf.readableBytes());
         sv2.setBuffer(svBuf);
+        svBuf.release(); // sv2 now owns the buffer
         sv2.setRecordCount(svCount);
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index bbd3e42..fc650b9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -268,6 +268,9 @@ public class DrillClient implements Closeable, ConnectionThrottle{
     @Override
     public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
 //      logger.debug("Result arrived.  Is Last Chunk: {}.  Full Result: {}", result.getHeader().getIsLastChunk(), result);
+      if (result.getHeader().getErrorCount() > 0) {
+        fail(new Exception(result.getHeader().getError(0).getMessage()));
+      }
       results.add(result);
       if(result.getHeader().getIsLastChunk()){
         future.set(results);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
index 0d19340..624042e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
@@ -39,10 +39,12 @@ public class Accountor {
   private final long total;
   private ConcurrentMap<ByteBuf, DebugStackTrace> buffers = Maps.newConcurrentMap();
   private final FragmentHandle handle;
+  private Accountor parent;
 
   public Accountor(FragmentHandle handle, Accountor parent, long max, long preAllocated) {
     // TODO: fix preallocation stuff
     AtomicRemainder parentRemainder = parent != null ? parent.remainder : null;
+    this.parent = parent;
     this.remainder = new AtomicRemainder(parentRemainder, max, preAllocated);
     this.total = max;
     this.handle = handle;
@@ -53,6 +55,13 @@ public class Accountor {
     }
   }
 
+  public long getAvailable() {
+    if (parent != null) {
+      return Math.min(parent.getAvailable(), getCapacity() - getAllocation());
+    }
+    return getCapacity() - getAllocation();
+  }
+
   public long getCapacity() {
     return total;
   }
@@ -62,9 +71,7 @@ public class Accountor {
   }
 
   public boolean reserve(long size) {
-    //TODO: for now, we won't stop reservation.
-    remainder.get(size);
-    return true;
+    return remainder.get(size);
   }
 
   public void forceAdditionalReservation(long size) {
@@ -89,7 +96,7 @@ public class Accountor {
       if(buf != null){
         DebugStackTrace dst = buffers.get(buf);
         if(dst == null) throw new IllegalStateException("Partially releasing a buffer that has already been released. Buffer: " + buf);
-        dst.size =- size;
+        dst.size -= size;
         if(dst.size < 0){
           throw new IllegalStateException("Partially releasing a buffer that has already been released. Buffer: " + buf);
         }
@@ -150,7 +157,7 @@ public class Accountor {
     
   }
 
-  private class DebugStackTrace {
+  public class DebugStackTrace {
 
     private StackTraceElement[] elements;
     private long size;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
index 8476b53..74849c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
@@ -17,6 +17,9 @@
  */
 package org.apache.drill.exec.memory;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -35,6 +38,7 @@ public class AtomicRemainder {
   private final long initTotal;
   private final long initShared;
   private final long initPrivate;
+  private boolean closed = false;
 
   public AtomicRemainder(AtomicRemainder parent, long max, long pre) {
     this.parent = parent;
@@ -43,6 +47,7 @@ public class AtomicRemainder {
     this.initTotal = max;
     this.initShared = max - pre;
     this.initPrivate = pre;
+//    logger.info("new AtomicRemainder. a.s. {} a.p. {} hashcode {}", availableShared, availablePrivate, hashCode(), new Exception());
   }
 
   public long getRemainder() {
@@ -60,25 +65,36 @@ public class AtomicRemainder {
    * @param size
    */
   public void forceGet(long size) {
-    if (DEBUG)
-      logger.info("Force get {}", size);
-    availableShared.addAndGet(size);
+    long newAvailableShared = availableShared.addAndGet(size);
+//    if (DEBUG)
+//      logger.info("Force get {}. a.s. {} a.p. {} hashcode: {}", size, availableShared, availablePrivate, hashCode(), new Exception());
+//    assert newAvailableShared <= initShared;
     if (parent != null)
       parent.forceGet(size);
   }
 
   public boolean get(long size) {
-    if (DEBUG)
-      logger.info("Get {}", size);
     if (availablePrivate.get() < 1) {
       // if there is no preallocated memory, we can operate normally.
 
+      // if there is a parent allocator, check it before allocating.
+      if (parent != null && !parent.get(size)) {
+        return false;
+      }
+
       // attempt to get shared memory, if fails, return false.
       long outcome = availableShared.addAndGet(-size);
+//      assert outcome <= initShared;
       if (outcome < 0) {
-        availableShared.addAndGet(size);
+        long newAvailableShared = availableShared.addAndGet(size);
+        assert newAvailableShared <= initShared;
+        if (parent != null) {
+          parent.returnAllocation(size);
+        }
         return false;
       } else {
+//        if (DEBUG)
+//          logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception());
         return true;
       }
 
@@ -86,6 +102,8 @@ public class AtomicRemainder {
       // if there is preallocated memory, use that first.
       long unaccount = availablePrivate.addAndGet(-size);
       if (unaccount >= 0) {
+//        if (DEBUG)
+//          logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception());
         return true;
       } else {
 
@@ -102,6 +120,8 @@ public class AtomicRemainder {
         if (account >= 0) {
           // we were succesful, move private back to zero (since we allocated using shared).
           availablePrivate.addAndGet(additionalSpaceNeeded);
+//          if (DEBUG)
+//            logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception());
           return true;
         } else {
           // we failed to get space from available shared. Return allocations to initial state.
@@ -122,26 +142,31 @@ public class AtomicRemainder {
    * @param size
    */
   public void returnAllocation(long size) {
-    if (DEBUG)
-      logger.info("Return allocation {}", size);
     long privateSize = availablePrivate.get();
     long privateChange = Math.min(size, initPrivate - privateSize);
     long sharedChange = size - privateChange;
     availablePrivate.addAndGet(privateChange);
     availableShared.addAndGet(sharedChange);
+//    if (DEBUG)
+//      logger.info("Return allocation {}, a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception());
     if (parent != null) {
       parent.returnAllocation(sharedChange);
     }
+    assert getUsed() <= initTotal;
   }
 
   public void close() {
-    
+    if (closed) {
+      logger.warn("Tried to close remainder, but it has already been closed", new Exception());
+      return;
+    }
     if (availablePrivate.get() != initPrivate || availableShared.get() != initShared)
       throw new IllegalStateException(
           String
               .format(ERROR, initPrivate, availablePrivate.get(), initPrivate - availablePrivate.get(), initShared, availableShared.get(), initShared - availableShared.get()));
     
     if(parent != null) parent.returnAllocation(initPrivate);
+    closed = true;
   }
 
   static final String ERROR = "Failure while closing accountor.  Expected private and shared pools to be set to initial values.  However, one or more were not.  Stats are\n\tzone\tinit\tallocated\tdelta \n\tprivate\t%d\t%d\t%d \n\tshared\t%d\t%d\t%d.";

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index e71c9c9..0b2add2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -22,10 +22,13 @@ import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocatorL;
 import io.netty.buffer.PooledUnsafeDirectByteBufL;
 
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.util.AssertionUtil;
 
@@ -40,6 +43,10 @@ public class TopLevelAllocator implements BufferAllocator {
   public TopLevelAllocator() {
     this(DrillConfig.getMaxDirectMemory());
   }
+
+  public TopLevelAllocator(DrillConfig config) {
+    this(Math.min(DrillConfig.getMaxDirectMemory(), config.getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC)));
+  }
   
   public TopLevelAllocator(long maximumAllocation) {
     this.acct = new Accountor(null, null, maximumAllocation, 0);
@@ -50,7 +57,7 @@ public class TopLevelAllocator implements BufferAllocator {
     if(!acct.reserve(min)) return null;
     ByteBuf buffer = innerAllocator.directBuffer(min, max);
     AccountingByteBuf wrapped = new AccountingByteBuf(acct, (PooledUnsafeDirectByteBufL) buffer);
-    acct.reserved(buffer.capacity() - min, wrapped);
+    acct.reserved(min, wrapped);
     return wrapped;
   }
   
@@ -74,15 +81,19 @@ public class TopLevelAllocator implements BufferAllocator {
     if(!acct.reserve(initialReservation)){
       throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, acct.getCapacity() - acct.getAllocation()));
     };
-    ChildAllocator allocator = new ChildAllocator(handle, acct, initialReservation, maximumReservation);
+    ChildAllocator allocator = new ChildAllocator(handle, acct, maximumReservation, initialReservation);
     if(ENABLE_ACCOUNTING) children.add(allocator);
     return allocator;
   }
 
   @Override
   public void close() {
-    if(ENABLE_ACCOUNTING && !children.isEmpty()){
-      throw new IllegalStateException("Failure while trying to close allocator: Child level allocators not closed.");
+    if (ENABLE_ACCOUNTING) {
+      for (ChildAllocator child : children) {
+        if (!child.isClosed()) {
+          throw new IllegalStateException("Failure while trying to close allocator: Child level allocators not closed.");
+        }
+      }
     }
     acct.close();
   }
@@ -91,14 +102,20 @@ public class TopLevelAllocator implements BufferAllocator {
   private class ChildAllocator implements BufferAllocator{
 
     private Accountor childAcct;
-    
+    private Map<ChildAllocator, StackTraceElement[]> children = new HashMap<>();
+    private boolean closed = false;
+    private FragmentHandle handle;
+
     public ChildAllocator(FragmentHandle handle, Accountor parentAccountor, long max, long pre) throws OutOfMemoryException{
+      assert max >= pre;
       childAcct = new Accountor(handle, parentAccountor, max, pre);
+      this.handle = handle;
     }
     
     @Override
     public AccountingByteBuf buffer(int size, int max) {
       if(!childAcct.reserve(size)){
+        logger.warn("Unable to allocate buffer of size {} due to memory limit. Current allocation: {}", size, getAllocatedMemory());
         return null;
       };
       
@@ -121,9 +138,11 @@ public class TopLevelAllocator implements BufferAllocator {
     public BufferAllocator getChildAllocator(FragmentHandle handle, long initialReservation, long maximumReservation)
         throws OutOfMemoryException {
       if(!childAcct.reserve(initialReservation)){
-        throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, childAcct.getCapacity() - childAcct.getAllocation()));
+        throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, childAcct.getAvailable()));
       };
-      return new ChildAllocator(handle, childAcct, maximumReservation, initialReservation);
+      ChildAllocator newChildAllocator = new ChildAllocator(handle, childAcct, maximumReservation, initialReservation);
+      this.children.put(newChildAllocator, Thread.currentThread().getStackTrace());
+      return newChildAllocator;
     }
 
     public PreAllocator getNewPreAllocator(){
@@ -132,7 +151,28 @@ public class TopLevelAllocator implements BufferAllocator {
 
     @Override
     public void close() {
+      if (ENABLE_ACCOUNTING) {
+        for (ChildAllocator child : children.keySet()) {
+          if (!child.isClosed()) {
+            StringBuilder sb = new StringBuilder();
+            StackTraceElement[] elements = children.get(child);
+            for (int i = 3; i < elements.length; i++) {
+              sb.append("\t\t");
+              sb.append(elements[i]);
+              sb.append("\n");
+            }
+            throw new IllegalStateException(String.format(
+                    "Failure while trying to close child allocator: Child level allocators not closed. Fragment %d:%d. Stack trace: \n %s",
+                    handle.getMajorFragmentId(), handle.getMinorFragmentId(), sb.toString()));
+          }
+        }
+      }
       childAcct.close();
+      closed = true;
+    }
+
+    public boolean isClosed() {
+      return closed;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 2035aa0..f3bcfef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -26,6 +26,7 @@ import net.hydromatic.optiq.SchemaPlus;
 import net.hydromatic.optiq.tools.Frameworks;
 
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.compile.ClassTransformer;
 import org.apache.drill.exec.compile.QueryClassLoader;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -83,6 +84,7 @@ public class FragmentContext implements Closeable {
     this.queryStartTime = fragment.getQueryStartTime();
     this.rootFragmentTimeZone = fragment.getTimeZone();
     logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
+    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
     this.allocator = context.getAllocator().getChildAllocator(fragment.getHandle(), fragment.getMemInitial(), fragment.getMemMax());
   }
 
@@ -138,10 +140,15 @@ public class FragmentContext implements Closeable {
    * Get this fragment's allocator.
    * @return
    */
+  @Deprecated
   public BufferAllocator getAllocator() {
     return allocator;
   }
 
+  public BufferAllocator getNewChildAllocator(long initialReservation, long maximumReservation) throws OutOfMemoryException {
+    return allocator.getChildAllocator(getHandle(), initialReservation, maximumReservation);
+  }
+
   public <T> T getImplementationClass(ClassGenerator<T> cg) throws ClassTransformationException, IOException {
     return getImplementationClass(cg.getCodeGenerator());
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
new file mode 100644
index 0000000..3b7b4c1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import org.apache.drill.common.util.Hook.Closeable;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+
+public class OperatorContext implements Closeable {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContext.class);
+
+  private final BufferAllocator allocator;
+  private boolean closed = false;
+  private PhysicalOperator popConfig;
+
+  public OperatorContext(PhysicalOperator popConfig, FragmentContext context) throws OutOfMemoryException {
+    this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation());
+    this.popConfig = popConfig;
+  }
+
+  public BufferAllocator getAllocator() {
+    if (allocator == null) {
+      throw new UnsupportedOperationException("Operator context does not have an allocator");
+    }
+    return allocator;
+  }
+
+  public boolean isClosed() {
+    return closed;
+  }
+
+  @Override
+  public void close() {
+    if (closed) {
+      logger.debug("Attempted to close Operator context for {}, but context is already closed", popConfig != null ? popConfig.getClass().getName() : null);
+      return;
+    }
+    logger.debug("Closing context for {}", popConfig != null ? popConfig.getClass().getName() : null);
+    if (allocator != null) {
+      allocator.close();
+    }
+    closed = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index 7eced4d..a79cbc3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -26,6 +26,8 @@ import com.google.common.base.Preconditions;
 public abstract class AbstractBase implements PhysicalOperator{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBase.class);
 
+  protected long initialAllocation = 1000000L;
+  protected long maxAllocation = 10000000000L;
 
 
   @Override
@@ -48,5 +50,15 @@ public abstract class AbstractBase implements PhysicalOperator{
   public SelectionVectorMode getSVMode() {
     return SelectionVectorMode.NONE;
   }
+
+  @Override
+  public long getInitialAllocation() {
+    return initialAllocation;
+  }
+
+  @Override
+  public long getMaxAllocation() {
+    return maxAllocation;
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 69fc447..f4cee2a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.base;
 import java.util.Iterator;
 import java.util.List;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.drill.common.expression.SchemaPath;
 
 import com.google.common.collect.Iterators;
@@ -45,6 +46,18 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
 
   @Override
   public GroupScan clone(List<SchemaPath> columns) {
-    throw new UnsupportedOperationException(String.format("%s does not implmemnt clone(columns) method!", this.getClass().getCanonicalName()));
+    throw new UnsupportedOperationException(String.format("%s does not implement clone(columns) method!", this.getClass().getCanonicalName()));
+  }
+
+  @Override
+  @JsonIgnore
+  public long getInitialAllocation() {
+    return 0;
+  }
+
+  @Override
+  @JsonIgnore
+  public long getMaxAllocation() {
+    return 0;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
index 57b9c18..97334ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
@@ -27,7 +27,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
 import com.google.common.collect.Iterators;
 
-public abstract class AbstractSubScan implements SubScan{
+public abstract class AbstractSubScan extends AbstractBase implements SubScan{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSubScan.class);
 
   @Override
@@ -72,5 +72,4 @@ public abstract class AbstractSubScan implements SubScan{
   public SelectionVectorMode getSVMode() {
     return SelectionVectorMode.NONE;
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index 66e1b46..db57922 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -87,4 +87,14 @@ public interface  PhysicalOperator extends GraphValue<PhysicalOperator> {
   @JsonIgnore
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException;
 
+  /**
+   * @return The memory to preallocate for this operator
+   */
+  public long getInitialAllocation();
+
+  /**
+   * @return The maximum memory this operator can allocate
+   */
+  public long getMaxAllocation();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
index 79f5f13..b55abef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.record.RecordBatch;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index e93fbcc..73ed723 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -32,20 +32,19 @@ import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.*;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.util.BatchPrinter;
+import org.apache.drill.exec.util.VectorUtil;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.collect.Maps;
@@ -58,12 +57,16 @@ import org.apache.drill.exec.vector.allocator.VectorAllocator;
 public class ScanBatch implements RecordBatch {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
 
+  private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
+  private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
+
   final Map<MaterializedField, ValueVector> fieldVectorMap = Maps.newHashMap();
 
   private final VectorContainer container = new VectorContainer();
   private int recordCount;
   private boolean schemaChanged = true;
   private final FragmentContext context;
+  private final OperatorContext oContext;
   private Iterator<RecordReader> readers;
   private RecordReader currentReader;
   private BatchSchema schema;
@@ -74,12 +77,13 @@ public class ScanBatch implements RecordBatch {
   List<Integer> selectedPartitionColumns;
   private String partitionColumnDesignator;
 
-  public ScanBatch(FragmentContext context, Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException {
+  public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException {
     this.context = context;
     this.readers = readers;
     if (!readers.hasNext())
       throw new ExecutionSetupException("A scan batch must contain at least one reader.");
     this.currentReader = readers.next();
+    this.oContext = new OperatorContext(subScanConfig, context);
     this.currentReader.setup(mutator);
     this.partitionColumns = partitionColumns.iterator();
     this.partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null;
@@ -89,8 +93,8 @@ public class ScanBatch implements RecordBatch {
     addPartitionVectors();
   }
 
-  public ScanBatch(FragmentContext context, Iterator<RecordReader> readers) throws ExecutionSetupException {
-    this(context, readers, Collections.EMPTY_LIST, Collections.EMPTY_LIST);
+  public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers) throws ExecutionSetupException {
+    this(subScanConfig, context, readers, Collections.EMPTY_LIST, Collections.EMPTY_LIST);
   }
 
   @Override
@@ -173,7 +177,7 @@ public class ScanBatch implements RecordBatch {
         byte[] bytes = val.getBytes();
         AllocationHelper.allocate(v, recordCount, val.length());
         for (int j = 0; j < recordCount; j++) {
-          v.getMutator().set(j, bytes);
+          v.getMutator().setSafe(j, bytes);
         }
         v.getMutator().setValueCount(recordCount);
       } else {
@@ -239,7 +243,7 @@ public class ScanBatch implements RecordBatch {
     @SuppressWarnings("unchecked")
     @Override
     public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
-      ValueVector v = TypeHelper.getNewVector(field, context.getAllocator());
+      ValueVector v = TypeHelper.getNewVector(field, oContext.getAllocator());
       if(!clazz.isAssignableFrom(v.getClass())) throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
       addField(v);
       return (T) v;
@@ -259,6 +263,7 @@ public class ScanBatch implements RecordBatch {
 
   public void cleanup(){
     container.clear();
+    oContext.close();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 2fc854a..a0ff28a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -70,11 +70,6 @@ public class ScreenCreator implements RootCreator<Screen>{
       this.connection = context.getConnection();
     }
     
-    private void closeAllocator(){
-      sendCount.waitForSendComplete();
-      context.getAllocator().close();
-    }
-    
     @Override
     public boolean next() {
       if(!ok){
@@ -86,7 +81,7 @@ public class ScreenCreator implements RootCreator<Screen>{
 //      logger.debug("Screen Outcome {}", outcome);
       switch(outcome){
       case STOP: {
-          closeAllocator();
+          sendCount.waitForSendComplete();
           QueryResult header = QueryResult.newBuilder() //
               .setQueryId(context.getHandle().getQueryId()) //
               .setRowCount(0) //
@@ -101,7 +96,7 @@ public class ScreenCreator implements RootCreator<Screen>{
           return false;
       }
       case NONE: {
-        closeAllocator();
+        sendCount.waitForSendComplete();
         context.getStats().batchesCompleted.inc(1);
         QueryResult header = QueryResult.newBuilder() //
             .setQueryId(context.getHandle().getQueryId()) //
@@ -133,8 +128,8 @@ public class ScreenCreator implements RootCreator<Screen>{
 
     @Override
     public void stop() {
-      incoming.cleanup();
       sendCount.waitForSendComplete();
+      incoming.cleanup();
     }
 
     private SendListener listener = new SendListener();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 17e233a..7679701 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -97,8 +97,8 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     @Override
     public void stop() {
       ok = false;
-      incoming.cleanup();
       sendCount.waitForSendComplete();
+      incoming.cleanup();
     }
     
     

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java
index 90d51b6..c583664 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.TopN;
 
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.record.VectorContainer;
@@ -26,7 +27,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
 
 public interface PriorityQueue {
   public void add(FragmentContext context, RecordBatchData batch) throws SchemaChangeException;
-  public void init(int limit, FragmentContext context, boolean hasSv2) throws SchemaChangeException;
+  public void init(int limit, FragmentContext context, BufferAllocator allocator, boolean hasSv2) throws SchemaChangeException;
   public void generate() throws SchemaChangeException;
   public VectorContainer getHyperBatch();
   public SelectionVector4 getHeapSv4();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
index d2d8d30..e0e7e51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
@@ -38,16 +38,18 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
   private SelectionVector4 finalSv4;//This is for final sorted output
   private ExpandableHyperContainer hyperBatch;
   private FragmentContext context;
+  private BufferAllocator allocator;
   private int limit;
   private int queueSize = 0;
   private int batchCount = 0;
   private boolean hasSv2;
 
   @Override
-  public void init(int limit, FragmentContext context, boolean hasSv2) throws SchemaChangeException {
+  public void init(int limit, FragmentContext context, BufferAllocator allocator,  boolean hasSv2) throws SchemaChangeException {
     this.limit = limit;
     this.context = context;
-    BufferAllocator.PreAllocator preAlloc = context.getAllocator().getNewPreAllocator();
+    this.allocator = allocator;
+    BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
     preAlloc.preAllocate(4 * (limit + 1));
     heapSv4 = new SelectionVector4(preAlloc.getAllocation(), limit, Character.MAX_VALUE);
     this.hasSv2 = hasSv2;
@@ -64,7 +66,7 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
     newContainer.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
     this.hyperBatch = new ExpandableHyperContainer(newContainer);
     this.batchCount = hyperBatch.iterator().next().getValueVectors().length;
-    BufferAllocator.PreAllocator preAlloc = context.getAllocator().getNewPreAllocator();
+    BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
     preAlloc.preAllocate(4 * (limit + 1));
     this.heapSv4 = new SelectionVector4(preAlloc.getAllocation(), limit, Character.MAX_VALUE);
     for (int i = 0; i < v4.getTotalCount(); i++) {
@@ -113,7 +115,7 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
   public void generate() throws SchemaChangeException {
     Stopwatch watch = new Stopwatch();
     watch.start();
-    BufferAllocator.PreAllocator preAlloc = context.getAllocator().getNewPreAllocator();
+    BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
     preAlloc.preAllocate(4 * queueSize);
     finalSv4 = new SelectionVector4(preAlloc.getAllocation(), queueSize, 4000);
     for (int i = queueSize - 1; i >= 0; i--) {