You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/03 05:03:05 UTC

[3/4] drill git commit: DRILL-2826: Simplify and centralize Operator Cleanup

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
index 0df9491..f249540 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
@@ -32,7 +32,8 @@ public class ComplexToJsonBatchCreator implements BatchCreator<ComplexToJson> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComplexToJsonBatchCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, ComplexToJson flatten, List<RecordBatch> children) throws ExecutionSetupException {
+  public ProjectRecordBatch getBatch(FragmentContext context, ComplexToJson flatten, List<RecordBatch> children)
+      throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new ProjectRecordBatch(new Project(null, flatten.getChild()),
                                   children.iterator().next(),

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
index cb1d4f1..e7a6b05 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
@@ -31,7 +31,8 @@ public class ProjectBatchCreator implements BatchCreator<Project>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectBatchCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, Project config, List<RecordBatch> children) throws ExecutionSetupException {
+  public ProjectRecordBatch getBatch(FragmentContext context, Project config, List<RecordBatch> children)
+      throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new ProjectRecordBatch(config, children.iterator().next(), context);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 0a097c1..74b7d85 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -84,10 +84,9 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
     builder.clear();
-    super.cleanup();
-    incoming.cleanup();
+    super.close();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
index 217acf2..559558f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
@@ -31,7 +31,8 @@ public class SortBatchCreator implements BatchCreator<Sort>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortBatchCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, Sort config, List<RecordBatch> children) throws ExecutionSetupException {
+  public SortBatch getBatch(FragmentContext context, Sort config, List<RecordBatch> children)
+      throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new SortBatch(config, context, children.iterator().next());
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 1fa759c..aa9297e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -187,8 +187,8 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
   }
 
   @Override
-  public void cleanup(){
-    super.cleanup();
+  public void close(){
+    super.close();
   }
 
   private class StraightCopier implements Copier{

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
index 455a5f9..9ab39a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
@@ -31,7 +31,8 @@ public class SVRemoverCreator implements BatchCreator<SelectionVectorRemover>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SVRemoverCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, SelectionVectorRemover config, List<RecordBatch> children) throws ExecutionSetupException {
+  public RemovingRecordBatch getBatch(FragmentContext context, SelectionVectorRemover config, List<RecordBatch> children)
+      throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new RemovingRecordBatch(config, context, children.iterator().next());
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
index 12afa33..40ef2bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
@@ -30,7 +30,7 @@ public class TraceBatchCreator implements BatchCreator<Trace> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceBatchCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, Trace config, List<RecordBatch> children)
+  public TraceRecordBatch getBatch(FragmentContext context, Trace config, List<RecordBatch> children)
       throws ExecutionSetupException {
     // Preconditions.checkArgument(children.size() == 1);
     return new TraceRecordBatch(config, children.iterator().next(), context);

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 8a7d659..af45815 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -155,7 +155,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
     /* Release the selection vector */
     if (sv != null) {
       sv.clear();
@@ -167,8 +167,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
     } catch (IOException e) {
       logger.error("Unable to close file descriptors for file: " + getFileName());
     }
-    super.cleanup();
-    incoming.cleanup();
+    super.close();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java
index 7f7e110..1ef3142 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java
@@ -31,7 +31,8 @@ public class UnionAllBatchCreator implements BatchCreator<UnionAll>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionAllBatchCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, UnionAll config, List<RecordBatch> children) throws ExecutionSetupException {
+  public UnionAllRecordBatch getBatch(FragmentContext context, UnionAll config, List<RecordBatch> children)
+      throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() >= 1);
     return new UnionAllRecordBatch(config, children, context);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 52b1794..d7ea3bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -20,15 +20,15 @@ package org.apache.drill.exec.physical.impl.union;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
-import com.google.common.collect.Lists;
+
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -38,6 +38,7 @@ import org.apache.drill.exec.expr.ValueVectorReadExpression;
 import org.apache.drill.exec.expr.ValueVectorWriteExpression;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.UnionAll;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
@@ -53,7 +54,8 @@ import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.physical.config.UnionAll;
+
+import com.google.common.collect.Lists;
 
 public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionAllRecordBatch.class);
@@ -132,12 +134,6 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
     return WritableBatch.get(this);
   }
 
-  @Override
-  public void cleanup() {
-    super.cleanup();
-    unionAllInput.cleanup();
-  }
-
   private void setValueCount(int count) {
     for (ValueVector v : allocationVectors) {
       ValueVector.Mutator m = v.getMutator();
@@ -505,11 +501,6 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
       return rightSide.getRecordBatch();
     }
 
-    public void cleanup() {
-      leftSide.getRecordBatch().cleanup();
-      rightSide.getRecordBatch().cleanup();
-    }
-
     private class OneSideInput {
       private IterOutcome upstream = IterOutcome.NOT_YET;
       private RecordBatch recordBatch;

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 094865e..09cb7ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -37,9 +37,9 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.record.RawFragmentBatchProvider;
-import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
@@ -50,7 +50,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 
-public class UnorderedReceiverBatch implements RecordBatch {
+public class UnorderedReceiverBatch implements CloseableRecordBatch {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class);
 
   private final RecordBatchLoader batchLoader;
@@ -60,7 +60,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
   private final OperatorStats stats;
   private boolean first = true;
   private final UnorderedReceiver config;
-  OperatorContext oContext;
+  private final OperatorContext oContext;
 
   public enum Metric implements MetricDef {
     BYTES_RECEIVED,
@@ -77,7 +77,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
     this.context = context;
     // In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector,
     // we may need an allocator for the new offset vector. Therefore, here we pass the context's allocator to batchLoader.
-    oContext = new OperatorContext(config, context, false);
+    oContext = context.newOperatorContext(config, false);
     this.batchLoader = new RecordBatchLoader(oContext.getAllocator());
 
     this.stats = context.getStats().getOperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), 1), null);
@@ -194,10 +194,9 @@ public class UnorderedReceiverBatch implements RecordBatch {
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
     batchLoader.clear();
     fragProvider.cleanup();
-    oContext.close();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
index d9864f9..649ecd9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
@@ -31,7 +31,7 @@ public class UnorderedReceiverCreator implements BatchCreator<UnorderedReceiver>
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, UnorderedReceiver receiver, List<RecordBatch> children)
+  public UnorderedReceiverBatch getBatch(FragmentContext context, UnorderedReceiver receiver, List<RecordBatch> children)
       throws ExecutionSetupException {
     assert children == null || children.isEmpty();
     IncomingBuffers bufHolder = context.getBuffers();

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 66ec22f..2ae53aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -20,9 +20,9 @@ package org.apache.drill.exec.physical.impl.validate;
 import java.util.Iterator;
 
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
@@ -32,7 +32,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.VectorValidator;
 
-public class IteratorValidatorBatchIterator implements RecordBatch {
+public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorBatchIterator.class);
 
   static final boolean VALIDATE_VECTORS = false;
@@ -144,8 +144,7 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
   }
 
   @Override
-  public void cleanup() {
-    incoming.cleanup();
+  public void close() {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
index 5d08afb..cc30326 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
@@ -31,7 +31,8 @@ public class IteratorValidatorCreator implements BatchCreator<IteratorValidator>
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, IteratorValidator config, List<RecordBatch> children)
+  public IteratorValidatorBatchIterator getBatch(FragmentContext context, IteratorValidator config,
+      List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new IteratorValidatorBatchIterator(children.iterator().next());

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
index d526a84..2298df5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
@@ -34,7 +34,7 @@ import com.google.common.collect.Iterators;
 
 public class ValuesBatchCreator implements BatchCreator<Values> {
   @Override
-  public RecordBatch getBatch(FragmentContext context, Values config, List<RecordBatch> children)
+  public ScanBatch getBatch(FragmentContext context, Values config, List<RecordBatch> children)
       throws ExecutionSetupException {
     assert children.isEmpty();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java
index 285e2cd..59bc115 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java
@@ -18,19 +18,21 @@
 
 package org.apache.drill.exec.physical.impl.window;
 
-import com.google.common.base.Preconditions;
+import java.util.List;
+
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.WindowPOP;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
 
-import java.util.List;
+import com.google.common.base.Preconditions;
 
 public class WindowFrameBatchCreator implements BatchCreator<WindowPOP> {
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, WindowPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+  public WindowFrameRecordBatch getBatch(FragmentContext context, WindowPOP config, List<RecordBatch> children)
+      throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new WindowFrameRecordBatch(config, context, children.iterator().next());
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index bc86390..86d11d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -17,8 +17,9 @@
  */
 package org.apache.drill.exec.physical.impl.window;
 
-import com.google.common.collect.Lists;
-import com.sun.codemodel.JExpr;
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.drill.common.exceptions.DrillException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
@@ -49,8 +50,8 @@ import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.ValueVector;
 
-import java.io.IOException;
-import java.util.List;
+import com.google.common.collect.Lists;
+import com.sun.codemodel.JExpr;
 
 /**
  * support for OVER(PARTITION BY expression1,expression2,... [ORDER BY expressionA, expressionB,...])
@@ -333,13 +334,12 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
     if (framer != null) {
       framer.cleanup();
       framer = null;
     }
-    super.cleanup();
-    incoming.cleanup();
+    super.close();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index ca93a72..e88bc67 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
-import com.google.common.base.Joiner;
 import io.netty.buffer.DrillBuf;
 
 import java.io.IOException;
@@ -68,6 +67,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.calcite.rel.RelFieldCollation.Direction;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
@@ -145,7 +145,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
     if (batchGroups != null) {
       for (BatchGroup group: batchGroups) {
         try {
@@ -165,8 +165,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       copier.cleanup();
     }
     copierAllocator.close();
-    super.cleanup();
-    incoming.cleanup();
+    super.close();
   }
 
   public void buildSchema() throws SchemaChangeException {

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
index eb5d83b..b9f6396 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
@@ -31,7 +31,8 @@ public class ExternalSortBatchCreator implements BatchCreator<ExternalSort>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatchCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, ExternalSort config, List<RecordBatch> children) throws ExecutionSetupException {
+  public ExternalSortBatch getBatch(FragmentContext context, ExternalSort config, List<RecordBatch> children)
+      throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new ExternalSortBatch(config, context, children.iterator().next());
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index c96cb7c..4e348bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -30,7 +30,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
-public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements RecordBatch{
+public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements CloseableRecordBatch {
   final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
 
   protected final VectorContainer container; //= new VectorContainer();
@@ -42,14 +42,15 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   protected BatchState state;
 
   protected AbstractRecordBatch(final T popConfig, final FragmentContext context) throws OutOfMemoryException {
-    this(popConfig, context, true, new OperatorContext(popConfig, context, true));
+    this(popConfig, context, true, context.newOperatorContext(popConfig, true));
   }
 
   protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema) throws OutOfMemoryException {
-    this(popConfig, context, buildSchema, new OperatorContext(popConfig, context, true));
+    this(popConfig, context, buildSchema, context.newOperatorContext(popConfig, true));
   }
 
-  protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema, final OperatorContext oContext) throws OutOfMemoryException {
+  protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema,
+      final OperatorContext oContext) throws OutOfMemoryException {
     super();
     this.context = context;
     this.popConfig = popConfig;
@@ -171,9 +172,8 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
 
   protected abstract void killIncoming(boolean sendUpstream);
 
-  public void cleanup(){
+  public void close(){
     container.clear();
-    oContext.close();
   }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 3cfe177..dd90cab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -115,13 +115,6 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
   }
 
   @Override
-  public void cleanup() {
-//    logger.debug("Cleaning up.");
-    super.cleanup();
-    incoming.cleanup();
-  }
-
-  @Override
   public BatchSchema getSchema() {
     if (container.hasSchema()) {
       return container.getSchema();

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/record/CloseableRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/CloseableRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/CloseableRecordBatch.java
new file mode 100644
index 0000000..c52c3ee
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/CloseableRecordBatch.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+public interface CloseableRecordBatch extends RecordBatch, AutoCloseable {
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 0a8ece5..6f10a1c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.record;
 
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -118,6 +117,4 @@ public interface RecordBatch extends VectorAccessible {
    */
   public WritableBatch getWritableBatch();
 
-  public void cleanup();
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
index 489a989..59999ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -29,7 +29,6 @@ import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.mapred.FsInput;
 import org.apache.avro.util.Utf8;
-
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.PathSegment;
@@ -47,7 +46,6 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index f1271b1..b4efe70 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.physical.impl.WriterRecordBatch;
+import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractRecordReader;
@@ -43,11 +44,10 @@ import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FormatMatcher;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
@@ -119,7 +119,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
   public abstract RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
       List<SchemaPath> columns) throws ExecutionSetupException;
 
-  RecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
+  CloseableRecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
     String partitionDesignator = context.getOptions()
       .getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
     List<SchemaPath> columns = scan.getColumns();
@@ -153,9 +153,11 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
     }
 
     int numParts = 0;
-    OperatorContext oContext = new OperatorContext(scan, context,
-        false /* ScanBatch is not subject to fragment memory limit */);
-    DrillFileSystem dfs;
+    OperatorContext oContext = context.newOperatorContext(scan, false /*
+                                                                       * ScanBatch is not subject to fragment memory
+                                                                       * limit
+                                                                       */);
+    final DrillFileSystem dfs;
     try {
       dfs = new DrillFileSystem(fsConf, oContext.getStats());
     } catch (IOException e) {
@@ -190,7 +192,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
 
   public abstract RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException;
 
-  public RecordBatch getWriterBatch(FragmentContext context, RecordBatch incoming, EasyWriter writer)
+  public CloseableRecordBatch getWriterBatch(FragmentContext context, RecordBatch incoming, EasyWriter writer)
       throws ExecutionSetupException {
     try {
       return new WriterRecordBatch(writer, incoming, context, getRecordWriter(context, writer));

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
index ac0d2e7..f9dfd8b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
@@ -22,13 +22,14 @@ import java.util.List;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 
 public class EasyReaderBatchCreator implements BatchCreator<EasySubScan>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyReaderBatchCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, EasySubScan config, List<RecordBatch> children)
+  public CloseableRecordBatch getBatch(FragmentContext context, EasySubScan config, List<RecordBatch> children)
       throws ExecutionSetupException {
     assert children == null || children.isEmpty();
     return config.getFormatPlugin().getReaderBatch(context, config);

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
index c91ceba..bfb4188 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
@@ -22,13 +22,14 @@ import java.util.List;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 
 public class EasyWriterBatchCreator implements BatchCreator<EasyWriter>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyWriterBatchCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, EasyWriter config, List<RecordBatch> children)
+  public CloseableRecordBatch getBatch(FragmentContext context, EasyWriter config, List<RecordBatch> children)
       throws ExecutionSetupException {
     assert children != null && children.size() == 1;
     return config.getFormatPlugin().getWriterBatch(context, children.iterator().next(), config);

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
index 84587a9..d59cda2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
@@ -30,7 +30,7 @@ public class DirectBatchCreator implements BatchCreator<DirectSubScan>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirectBatchCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, DirectSubScan config, List<RecordBatch> children)
+  public ScanBatch getBatch(FragmentContext context, DirectSubScan config, List<RecordBatch> children)
       throws ExecutionSetupException {
     return new ScanBatch(config, context, Collections.singleton(config.getReader()).iterator());
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 2cccc64..2666b2e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -22,6 +22,7 @@ import java.io.InputStream;
 import java.util.List;
 
 import com.google.common.collect.ImmutableList;
+
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
index b38a33f..2ef2333 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
@@ -31,7 +31,8 @@ public class InfoSchemaBatchCreator implements BatchCreator<InfoSchemaSubScan>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaBatchCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, InfoSchemaSubScan config, List<RecordBatch> children) throws ExecutionSetupException {
+  public ScanBatch getBatch(FragmentContext context, InfoSchemaSubScan config, List<RecordBatch> children)
+      throws ExecutionSetupException {
     RecordReader rr = config.getTable().getRecordReader(context.getRootSchema(), config.getFilter());
     return new ScanBatch(config, context, Collections.singleton(rr).iterator());
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
index 0bfd038..74423bf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
@@ -34,7 +34,8 @@ public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, MockSubScanPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+  public ScanBatch getBatch(FragmentContext context, MockSubScanPOP config, List<RecordBatch> children)
+      throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     List<MockScanEntry> entries = config.getReadEntries();
     List<RecordReader> readers = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 7298f53..cfa4c93 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.DrillPathFilter;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
@@ -45,7 +46,6 @@ import org.apache.drill.exec.store.dfs.FormatMatcher;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.drill.exec.store.dfs.FormatSelection;
 import org.apache.drill.exec.store.dfs.MagicString;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.mock.MockStorageEngine;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -151,7 +151,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
     return recordWriter;
   }
 
-  public RecordBatch getWriterBatch(FragmentContext context, RecordBatch incoming, ParquetWriter writer)
+  public WriterRecordBatch getWriterBatch(FragmentContext context, RecordBatch incoming, ParquetWriter writer)
           throws ExecutionSetupException {
     try {
       return new WriterRecordBatch(writer, incoming, context, getRecordWriter(context, writer));

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 3e35721..3506ffa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -99,7 +99,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
   public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException{
     super();
-    this.oContext=new OperatorContext(writer, context, true);
+    this.oContext = context.newOperatorContext(writer, true);
   }
 
   @Override
@@ -331,9 +331,6 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     if (pageStore != null) {
       ColumnChunkPageWriteStoreExposer.close(pageStore);
     }
-    if(oContext!=null){
-      oContext.close();
-    }
 
     if (!hasRecords) {
       // the very last file is empty, delete it (DRILL-2408)

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 52dccd9..d5586ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -61,14 +61,17 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
   private static final String ENABLE_TIME_READ_COUNTER = "parquet.benchmark.time.read";
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException {
+  public ScanBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children)
+      throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     String partitionDesignator = context.getOptions()
       .getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
     List<SchemaPath> columns = rowGroupScan.getColumns();
     List<RecordReader> readers = Lists.newArrayList();
-    OperatorContext oContext = new OperatorContext(rowGroupScan, context,
-        false /* ScanBatch is not subject to fragment memory limit */);
+    OperatorContext oContext = context.newOperatorContext(rowGroupScan, false /*
+                                                                               * ScanBatch is not subject to fragment
+                                                                               * memory limit
+                                                                               */);
 
     List<String[]> partitionColumns = Lists.newArrayList();
     List<Integer> selectedPartitionColumns = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java
index 10dd26d..79c5709 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java
@@ -22,13 +22,14 @@ import java.util.List;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.WriterRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 
 public class ParquetWriterBatchCreator implements BatchCreator<ParquetWriter>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetWriterBatchCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, ParquetWriter config, List<RecordBatch> children)
+  public WriterRecordBatch getBatch(FragmentContext context, ParquetWriter config, List<RecordBatch> children)
       throws ExecutionSetupException {
     assert children != null && children.size() == 1;
     return config.getFormatPlugin().getWriterBatch(context, children.iterator().next(), config);

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 4d837c1..921d134 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
+
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.PathSegment;
@@ -69,9 +70,9 @@ import parquet.schema.MessageType;
 import parquet.schema.Type;
 import parquet.schema.PrimitiveType;
 
-
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+
 import parquet.schema.Types;
 
 public class DrillParquetReader extends AbstractRecordReader {

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
index 92f676a..58bf433 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
@@ -39,7 +39,7 @@ public class SystemTableBatchCreator implements BatchCreator<SystemTableScan> {
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
   @Override
-  public RecordBatch getBatch(final FragmentContext context, final SystemTableScan scan,
+  public ScanBatch getBatch(final FragmentContext context, final SystemTableScan scan,
                               final List<RecordBatch> children)
     throws ExecutionSetupException {
     final SystemTable table = scan.getTable();

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index 3368412..87c78b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
+
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.FieldReference;

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 85262de..d23655c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -105,9 +105,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
   public void cleanup() {
     if (!isFinished() && context.shouldContinue()) {
       final String msg = String.format("Cleanup before finished. " + (fragmentCount - streamCounter) + " out of " + fragmentCount + " streams have finished.");
-      logger.error(msg);
       final IllegalStateException e = new IllegalStateException(msg);
-      context.fail(e);
       throw e;
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index edbcfde..4249cbe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -212,19 +212,23 @@ public class Foreman implements Runnable {
       moveToState(QueryState.FAILED,
           new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex));
     } catch (final OutOfMemoryError e) {
-      /*
-       * FragmentExecutors use a DrillbitStatusListener to watch out for the death of their query's Foreman.
-       * So, if we die here, they should get notified about that, and cancel themselves; we don't have to
-       * attempt to notify them, which might not work under these conditions.
-       */
-      /*
-       * TODO this will kill everything in this JVM; why can't we just free all allocation
-       * associated with this Foreman and allow others to continue?
-       */
-      System.out.println("Out of memory, exiting.");
-      e.printStackTrace();
-      System.out.flush();
-      System.exit(-1);
+      if ("Direct buffer memory".equals(e.getMessage())) {
+        moveToState(QueryState.FAILED,
+            UserException.resourceError(e)
+                .message("One or more nodes ran out of memory while executing the query.")
+                .build());
+      } else {
+        /*
+         * FragmentExecutors use a DrillbitStatusListener to watch out for the death of their query's Foreman. So, if we
+         * die here, they should get notified about that, and cancel themselves; we don't have to attempt to notify
+         * them, which might not work under these conditions.
+         */
+        System.out.println("Node ran out of Heap memory, exiting.");
+        e.printStackTrace();
+        System.out.flush();
+        System.exit(-1);
+      }
+
     } finally {
       /*
        * Begin accepting external events.

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 0783fee..fb2045f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -272,12 +272,14 @@ public class FragmentExecutor implements Runnable {
 
   private void closeOutResources() {
 
+    // first close the operators and release all memory.
     try {
-      root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
+      root.close();
     } catch (final Exception e) {
       fail(e);
     }
 
+    // then close the fragment context.
     fragmentContext.close();
 
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index b02051b..d6e6d08 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -25,7 +25,6 @@ import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.base.Preconditions;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
@@ -45,9 +44,9 @@ import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.util.TestUtilities;
 import org.apache.drill.exec.util.JsonStringArrayList;
 import org.apache.drill.exec.util.JsonStringHashMap;
+import org.apache.drill.exec.util.TestUtilities;
 import org.apache.drill.exec.util.VectorUtil;
 import org.apache.hadoop.io.Text;
 import org.junit.AfterClass;
@@ -57,6 +56,7 @@ import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
 import com.google.common.io.Resources;
 
 public class BaseTestQuery extends ExecTest {
@@ -219,7 +219,7 @@ public class BaseTestQuery extends ExecTest {
   }
 
   @AfterClass
-  public static void closeClient() throws IOException{
+  public static void closeClient() throws IOException, InterruptedException {
     if (client != null) {
       client.close();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/test/java/org/apache/drill/exec/RunRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/RunRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/RunRootExec.java
index 4258e60..13f9563 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/RunRootExec.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/RunRootExec.java
@@ -65,7 +65,7 @@ public class RunRootExec {
       }
       System.out.println("ENDITER: " + i);
       System.out.println("TIME: " + w.elapsed(TimeUnit.MILLISECONDS) + "ms");
-      exec.stop();
+      exec.close();
     }
     context.close();
     bit.close();

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
index f4f4966..7c58b19 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
@@ -88,7 +88,7 @@ public class DumpCatTest  extends ExecTest{
       }
       assertTrue(!context.isFailed());
 
-      exec.stop();
+      exec.close();
 
       FragmentHandle handle = context.getHandle();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java
index 4942185..04e1980 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java
@@ -17,7 +17,11 @@
  ******************************************************************************/
 package org.apache.drill.exec.fn.interp;
 
-import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
 import org.antlr.runtime.ANTLRStringStream;
 import org.antlr.runtime.CommonTokenStream;
 import org.antlr.runtime.RecognitionException;
@@ -36,6 +40,7 @@ import org.apache.drill.exec.expr.fn.interpreter.InterpreterEvaluator;
 import org.apache.drill.exec.expr.holders.TimeStampHolder;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.QueryDateTimeInfo;
+import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.pop.PopUnitTestBase;
 import org.apache.drill.exec.proto.BitControl;
 import org.apache.drill.exec.record.MaterializedField;
@@ -49,10 +54,7 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.joda.time.DateTime;
 import org.junit.Test;
 
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
+import com.google.common.collect.Lists;
 
 public class ExpressionInterpreterTest  extends PopUnitTestBase {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExpressionInterpreterTest.class);
@@ -170,7 +172,7 @@ public class ExpressionInterpreterTest  extends PopUnitTestBase {
     MockGroupScanPOP.MockScanEntry entry = new MockGroupScanPOP.MockScanEntry(10, columns);
     MockSubScanPOP scanPOP = new MockSubScanPOP("testTable", java.util.Collections.singletonList(entry));
 
-    RecordBatch batch = createMockScanBatch(bit1, scanPOP, planFragment);
+    ScanBatch batch = createMockScanBatch(bit1, scanPOP, planFragment);
 
     batch.next();
 
@@ -184,13 +186,13 @@ public class ExpressionInterpreterTest  extends PopUnitTestBase {
     showValueVectorContent(vv);
 
     vv.clear();
-    batch.cleanup();
+    batch.close();
     batch.getContext().close();
     bit1.close();
   }
 
 
-  private RecordBatch createMockScanBatch(Drillbit bit, MockSubScanPOP scanPOP, BitControl.PlanFragment planFragment) {
+  private ScanBatch createMockScanBatch(Drillbit bit, MockSubScanPOP scanPOP, BitControl.PlanFragment planFragment) {
     List<RecordBatch> children = Lists.newArrayList();
     MockScanBatchCreator creator = new MockScanBatchCreator();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
index 1f0951b..74ce225 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
@@ -19,9 +19,13 @@
 package org.apache.drill.exec.memory;
 
 
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
+import static org.junit.Assert.assertEquals;
 import io.netty.buffer.DrillBuf;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.ExecConstants;
@@ -41,11 +45,9 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.junit.Test;
-import static org.junit.Assert.assertEquals;
 
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
 
 public class TestAllocators {
 
@@ -100,23 +102,25 @@ public class TestAllocators {
     OperatorStats stats;
 
     //Use some bogus operator type to create a new operator context.
-    def = new OpProfileDef(physicalOperator1.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE, OperatorContext.getChildCount(physicalOperator1));
+    def = new OpProfileDef(physicalOperator1.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
+        OperatorContext.getChildCount(physicalOperator1));
     stats = fragmentContext1.getStats().getOperatorStats(def, fragmentContext1.getAllocator());
 
 
     // Add a couple of Operator Contexts
     // Initial allocation = 1000000 bytes for all operators
-    OperatorContext oContext11 = new OperatorContext(physicalOperator1, fragmentContext1, true);
+    OperatorContext oContext11 = fragmentContext1.newOperatorContext(physicalOperator1, true);
     DrillBuf b11=oContext11.getAllocator().buffer(1000000);
 
-    OperatorContext oContext12 = new OperatorContext(physicalOperator2, fragmentContext1, stats, true);
+    OperatorContext oContext12 = fragmentContext1.newOperatorContext(physicalOperator2, stats, true);
     DrillBuf b12=oContext12.getAllocator().buffer(500000);
 
-    OperatorContext oContext21 = new OperatorContext(physicalOperator3, fragmentContext2, true);
+    OperatorContext oContext21 = fragmentContext1.newOperatorContext(physicalOperator3, true);
 
-    def = new OpProfileDef(physicalOperator4.getOperatorId(), UserBitShared.CoreOperatorType.TEXT_WRITER_VALUE, OperatorContext.getChildCount(physicalOperator4));
+    def = new OpProfileDef(physicalOperator4.getOperatorId(), UserBitShared.CoreOperatorType.TEXT_WRITER_VALUE,
+        OperatorContext.getChildCount(physicalOperator4));
     stats = fragmentContext2.getStats().getOperatorStats(def, fragmentContext2.getAllocator());
-    OperatorContext oContext22 = new OperatorContext(physicalOperator4, fragmentContext2, stats, true);
+    OperatorContext oContext22 = fragmentContext2.newOperatorContext(physicalOperator4, stats, true);
     DrillBuf b22=oContext22.getAllocator().buffer(2000000);
 
     // New Fragment begins
@@ -127,15 +131,16 @@ public class TestAllocators {
     FragmentContext fragmentContext3 = new FragmentContext(bitContext, pf3, null, functionRegistry);
 
     // New fragment starts an operator that allocates an amount within the limit
-    def = new OpProfileDef(physicalOperator5.getOperatorId(), UserBitShared.CoreOperatorType.UNION_VALUE, OperatorContext.getChildCount(physicalOperator5));
+    def = new OpProfileDef(physicalOperator5.getOperatorId(), UserBitShared.CoreOperatorType.UNION_VALUE,
+        OperatorContext.getChildCount(physicalOperator5));
     stats = fragmentContext3.getStats().getOperatorStats(def, fragmentContext3.getAllocator());
-    OperatorContext oContext31 = new OperatorContext(physicalOperator5, fragmentContext3, stats, true);
+    OperatorContext oContext31 = fragmentContext3.newOperatorContext(physicalOperator5, stats, true);
 
     DrillBuf b31a = oContext31.getAllocator().buffer(200000);
 
     //Previously running operator completes
     b22.release();
-    oContext22.close();
+    ((AutoCloseable) oContext22).close();
 
     // Fragment 3 asks for more and fails
     boolean outOfMem=false;
@@ -153,7 +158,7 @@ public class TestAllocators {
 
     // Operator is Exempt from Fragment limits. Fragment 3 asks for more and succeeds
     outOfMem=false;
-    OperatorContext oContext32 = new OperatorContext(physicalOperator6, fragmentContext3, false);
+    OperatorContext oContext32 = fragmentContext3.newOperatorContext(physicalOperator6, false);
     DrillBuf b32=null;
     try {
       b32=oContext32.getAllocator().buffer(4400000);
@@ -165,17 +170,17 @@ public class TestAllocators {
       }else{
         outOfMem=true;
       }
-      oContext32.close();
+      closeOp(oContext32);
     }
     assertEquals(false, (boolean)outOfMem);
 
     b11.release();
-    oContext11.close();
+    closeOp(oContext11);
     b12.release();
-    oContext12.close();
-    oContext21.close();
+    closeOp(oContext12);
+    closeOp(oContext21);
     b31a.release();
-    oContext31.close();
+    closeOp(oContext31);
 
     fragmentContext1.close();
     fragmentContext2.close();
@@ -184,4 +189,8 @@ public class TestAllocators {
     bit.close();
     serviceSet.close();
   }
+
+  private void closeOp(OperatorContext c) throws Exception {
+    ((AutoCloseable) c).close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index 2536bbb..42d2193 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -101,7 +101,6 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector> {
     switch (incoming.next()) {
     case NONE:
     case STOP:
-      incoming.cleanup();
       return false;
     default:
       return true;
@@ -109,8 +108,8 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector> {
   }
 
   @Override
-  public void stop() {
-    screenRoot.stop();
+  public void close() throws Exception {
+    screenRoot.close();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastFunctions.java
index e5448ac..ffa8765 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastFunctions.java
@@ -111,7 +111,7 @@ public class TestCastFunctions extends PopUnitTestBase{
       assertEquals(5, count);
     }
 
-    exec.stop();
+    exec.close();
 
     context.close();
     allocator.close();
@@ -159,7 +159,7 @@ public class TestCastFunctions extends PopUnitTestBase{
       assertEquals(5, count);
     }
 
-    exec.stop();
+    exec.close();
 
     context.close();
     allocator.close();
@@ -205,7 +205,7 @@ public class TestCastFunctions extends PopUnitTestBase{
       assertEquals(5, count);
     }
 
-    exec.stop();
+    exec.close();
 
     context.close();
     allocator.close();
@@ -252,7 +252,7 @@ public class TestCastFunctions extends PopUnitTestBase{
       assertEquals(5, count);
     }
 
-    exec.stop();
+    exec.close();
 
     context.close();
     allocator.close();
@@ -299,7 +299,7 @@ public class TestCastFunctions extends PopUnitTestBase{
       assertEquals(5, count);
     }
 
-    exec.stop();
+    exec.close();
 
     context.close();
     allocator.close();
@@ -346,7 +346,7 @@ public class TestCastFunctions extends PopUnitTestBase{
       }
       assertEquals(5, count);
     }
-    exec.stop();
+    exec.close();
 
     context.close();
     allocator.close();
@@ -392,7 +392,7 @@ public class TestCastFunctions extends PopUnitTestBase{
       }
       assertEquals(5, count);
     }
-    exec.stop();
+    exec.close();
 
     context.close();
     allocator.close();
@@ -428,7 +428,7 @@ public class TestCastFunctions extends PopUnitTestBase{
     while(exec.next()){
     }
 
-    exec.stop();
+    exec.close();
 
     context.close();
     allocator.close();

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
index 0f6fd43..c69c6f5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
@@ -80,7 +80,7 @@ public class TestComparisonFunctions extends ExecTest {
 //      }
     }
 
-    exec.stop();
+    exec.close();
     context.close();
 
     if (context.getFailureCause() != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
index a112d92..a069078 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
@@ -74,7 +74,7 @@ public class TestSimpleFilter extends ExecTest {
       assertEquals(50, exec.getRecordCount());
     }
 
-    exec.stop();
+    exec.close();
 
     if(context.getFailureCause() != null){
       throw context.getFailureCause();
@@ -106,7 +106,7 @@ public class TestSimpleFilter extends ExecTest {
       }
       recordCount += exec.getSelectionVector4().getCount();
     }
-    exec.stop();
+    exec.close();
     assertEquals(50, recordCount);
 
     if(context.getFailureCause() != null){

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
index ef3a330..6c067fe 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
@@ -93,7 +93,7 @@ public class TestHashJoin extends PopUnitTestBase {
     while (exec.next()) {
       totalRecordCount += exec.getRecordCount();
     }
-    exec.stop();
+    exec.close();
     assertEquals(expectedRows, totalRecordCount);
     System.out.println("Total Record Count: " + totalRecordCount);
     if (context.getFailureCause() != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
index 46bcc60..6a6a7e0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
@@ -17,7 +17,10 @@
  */
 package org.apache.drill.exec.physical.impl.partitionsender;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
@@ -46,8 +49,8 @@ import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.planner.fragment.PlanningSet;
 import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
 import org.apache.drill.exec.pop.PopUnitTestBase;
-import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.MetricValue;
 import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -363,8 +366,8 @@ public class TestPartitionSender extends PlanTestBase {
       super(context, incoming, operator);
     }
 
-    public void close() {
-      oContext.close();
+    public void close() throws Exception {
+      ((AutoCloseable) oContext).close();
     }
 
     public int getNumberPartitions() {

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
index d0d4005..b82846e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
@@ -82,7 +82,7 @@ public class TestTraceMultiRecordBatch extends ExecTest {
           }
         }
 
-        exec.stop();
+        exec.close();
 
         if(context.getFailureCause() != null){
             throw context.getFailureCause();

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
index f6766b1..1cb72ff 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
@@ -93,7 +93,7 @@ public class TestTraceOutputDump extends ExecTest {
         while(exec.next()){
         }
 
-        exec.stop();
+        exec.close();
 
         if(context.getFailureCause() != null){
             throw context.getFailureCause();