You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/03 05:03:06 UTC

[4/4] drill git commit: DRILL-2826: Simplify and centralize Operator Cleanup

DRILL-2826: Simplify and centralize Operator Cleanup

- Remove cleanup method from RecordBatch interface
- Make OperatorContext creation and closing the management of FragmentContext
- Make OperatorContext an abstract class and the impl only available to FragmentContext
- Make RecordBatch closing the responsibility of the RootExec
- Make all closes be suppresing closes to maximize memory release in failure
- Add new CloseableRecordBatch interface used by RootExec
- Make RootExec AutoCloseable
- Update RecordBatchCreator to return CloseableRecordBatches so that RootExec can maintain list
- Generate list of operators through change in ImplCreator


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/88bb0519
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/88bb0519
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/88bb0519

Branch: refs/heads/master
Commit: 88bb05194b023467d590ac747ec5fa14d04249f5
Parents: 636177d
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sat Apr 18 16:40:02 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat May 2 19:33:54 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/common/StackTrace.java     |   2 +-
 .../common/exceptions/UserExceptionContext.java |   6 +-
 .../exec/store/hbase/HBaseScanBatchCreator.java |   3 +-
 .../exec/store/hive/HiveScanBatchCreator.java   |   3 +-
 .../exec/store/mongo/MongoScanBatchCreator.java |   2 +-
 .../drill/exec/memory/TopLevelAllocator.java    |  21 +++-
 .../apache/drill/exec/ops/FragmentContext.java  |  28 ++++-
 .../apache/drill/exec/ops/OperatorContext.java  |  88 ++-------------
 .../drill/exec/ops/OperatorContextImpl.java     | 110 +++++++++++++++++++
 .../drill/exec/physical/impl/BaseRootExec.java  |  28 ++++-
 .../drill/exec/physical/impl/BatchCreator.java  |   5 +-
 .../drill/exec/physical/impl/ImplCreator.java   |  45 ++++++--
 .../physical/impl/MergingReceiverCreator.java   |   2 +-
 .../drill/exec/physical/impl/RootExec.java      |   7 +-
 .../drill/exec/physical/impl/ScanBatch.java     |  10 +-
 .../drill/exec/physical/impl/ScreenCreator.java |  17 ---
 .../exec/physical/impl/SingleSenderCreator.java |  10 +-
 .../exec/physical/impl/TopN/TopNBatch.java      |   9 +-
 .../impl/TopN/TopNSortBatchCreator.java         |   3 +-
 .../exec/physical/impl/WriterRecordBatch.java   |   6 +-
 .../physical/impl/aggregate/HashAggBatch.java   |   8 +-
 .../impl/aggregate/HashAggBatchCreator.java     |   3 +-
 .../impl/aggregate/HashAggTemplate.java         |   9 +-
 .../impl/aggregate/StreamingAggBatch.java       |   8 +-
 .../aggregate/StreamingAggBatchCreator.java     |   3 +-
 .../BroadcastSenderRootExec.java                |   9 +-
 .../impl/filter/FilterBatchCreator.java         |   3 +-
 .../physical/impl/filter/FilterRecordBatch.java |   4 +-
 .../impl/flatten/FlattenBatchCreator.java       |   4 +-
 .../exec/physical/impl/join/HashJoinBatch.java  |   6 +-
 .../impl/join/HashJoinBatchCreator.java         |   3 +-
 .../exec/physical/impl/join/MergeJoinBatch.java |   9 --
 .../physical/impl/join/MergeJoinCreator.java    |   3 +-
 .../physical/impl/join/NestedLoopJoinBatch.java |  21 ++--
 .../impl/join/NestedLoopJoinBatchCreator.java   |   7 +-
 .../physical/impl/limit/LimitBatchCreator.java  |   3 +-
 .../physical/impl/limit/LimitRecordBatch.java   |   4 +-
 .../impl/mergereceiver/MergingRecordBatch.java  |   8 +-
 .../OrderedPartitionRecordBatch.java            |  11 +-
 .../PartitionSenderRootExec.java                |  11 +-
 .../impl/producer/ProducerConsumerBatch.java    |   5 +-
 .../producer/ProducerConsumerBatchCreator.java  |   3 +-
 .../impl/project/ComplexToJsonBatchCreator.java |   3 +-
 .../impl/project/ProjectBatchCreator.java       |   3 +-
 .../exec/physical/impl/sort/SortBatch.java      |   5 +-
 .../physical/impl/sort/SortBatchCreator.java    |   3 +-
 .../impl/svremover/RemovingRecordBatch.java     |   4 +-
 .../impl/svremover/SVRemoverCreator.java        |   3 +-
 .../physical/impl/trace/TraceBatchCreator.java  |   2 +-
 .../physical/impl/trace/TraceRecordBatch.java   |   5 +-
 .../impl/union/UnionAllBatchCreator.java        |   3 +-
 .../impl/union/UnionAllRecordBatch.java         |  19 +---
 .../UnorderedReceiverBatch.java                 |  11 +-
 .../UnorderedReceiverCreator.java               |   2 +-
 .../IteratorValidatorBatchIterator.java         |   7 +-
 .../impl/validate/IteratorValidatorCreator.java |   3 +-
 .../impl/values/ValuesBatchCreator.java         |   2 +-
 .../impl/window/WindowFrameBatchCreator.java    |   8 +-
 .../impl/window/WindowFrameRecordBatch.java     |  14 +--
 .../physical/impl/xsort/ExternalSortBatch.java  |   7 +-
 .../impl/xsort/ExternalSortBatchCreator.java    |   3 +-
 .../drill/exec/record/AbstractRecordBatch.java  |  12 +-
 .../exec/record/AbstractSingleRecordBatch.java  |   7 --
 .../drill/exec/record/CloseableRecordBatch.java |  22 ++++
 .../apache/drill/exec/record/RecordBatch.java   |   3 -
 .../drill/exec/store/avro/AvroRecordReader.java |   2 -
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |  16 +--
 .../store/dfs/easy/EasyReaderBatchCreator.java  |   3 +-
 .../store/dfs/easy/EasyWriterBatchCreator.java  |   3 +-
 .../exec/store/direct/DirectBatchCreator.java   |   2 +-
 .../exec/store/easy/json/JSONRecordReader.java  |   1 +
 .../store/ischema/InfoSchemaBatchCreator.java   |   3 +-
 .../exec/store/mock/MockScanBatchCreator.java   |   3 +-
 .../exec/store/parquet/ParquetFormatPlugin.java |   4 +-
 .../exec/store/parquet/ParquetRecordWriter.java |   5 +-
 .../store/parquet/ParquetScanBatchCreator.java  |   9 +-
 .../parquet/ParquetWriterBatchCreator.java      |   3 +-
 .../exec/store/parquet2/DrillParquetReader.java |   3 +-
 .../exec/store/sys/SystemTableBatchCreator.java |   2 +-
 .../exec/store/text/DrillTextRecordReader.java  |   1 +
 .../work/batch/UnlimitedRawBatchBuffer.java     |   2 -
 .../apache/drill/exec/work/foreman/Foreman.java |  30 ++---
 .../exec/work/fragment/FragmentExecutor.java    |   4 +-
 .../java/org/apache/drill/BaseTestQuery.java    |   6 +-
 .../java/org/apache/drill/exec/RunRootExec.java |   2 +-
 .../apache/drill/exec/client/DumpCatTest.java   |   2 +-
 .../fn/interp/ExpressionInterpreterTest.java    |  18 +--
 .../drill/exec/memory/TestAllocators.java       |  51 +++++----
 .../exec/physical/impl/SimpleRootExec.java      |   5 +-
 .../exec/physical/impl/TestCastFunctions.java   |  16 +--
 .../physical/impl/TestComparisonFunctions.java  |   2 +-
 .../physical/impl/filter/TestSimpleFilter.java  |   4 +-
 .../exec/physical/impl/join/TestHashJoin.java   |   2 +-
 .../partitionsender/TestPartitionSender.java    |  11 +-
 .../impl/trace/TestTraceMultiRecordBatch.java   |   2 +-
 .../impl/trace/TestTraceOutputDump.java         |   2 +-
 96 files changed, 498 insertions(+), 422 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/common/src/main/java/org/apache/drill/common/StackTrace.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/StackTrace.java b/common/src/main/java/org/apache/drill/common/StackTrace.java
index 454c3a8..54068ec 100644
--- a/common/src/main/java/org/apache/drill/common/StackTrace.java
+++ b/common/src/main/java/org/apache/drill/common/StackTrace.java
@@ -56,7 +56,7 @@ public class StackTrace {
         writer.write('.');
         writer.write(ste.getMethodName());
         writer.write(':');
-        writer.write(ste.getLineNumber());
+        writer.write(Integer.toString(ste.getLineNumber()));
         writer.write('\n');
       }
     } catch(IOException e) {

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java b/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java
index 1054c7f..7acf81d 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java
@@ -17,12 +17,12 @@
  */
 package org.apache.drill.common.exceptions;
 
-import org.apache.drill.exec.proto.CoordinationProtos;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import org.apache.drill.exec.proto.CoordinationProtos;
+
 /**
  * Holds context information about a DrillUserException. We can add structured context information that will added
  * to the error message displayed to the client.
@@ -141,7 +141,7 @@ class UserExceptionContext {
     }
 
     // add identification infos
-    sb.append("\n[");
+    sb.append("\n[Error Id: ");
     sb.append(errorId).append(" ");
     if(endpoint != null) {
       sb.append("on ")

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
index 9256157..7f779df 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
@@ -35,7 +35,8 @@ public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseScanBatchCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, HBaseSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
+  public ScanBatch getBatch(FragmentContext context, HBaseSubScan subScan, List<RecordBatch> children)
+      throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     List<RecordReader> readers = Lists.newArrayList();
     List<SchemaPath> columns = null;

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
index a1273c0..e832d34 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
@@ -34,7 +34,8 @@ import com.google.common.collect.Lists;
 public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, HiveSubScan config, List<RecordBatch> children) throws ExecutionSetupException {
+  public ScanBatch getBatch(FragmentContext context, HiveSubScan config, List<RecordBatch> children)
+      throws ExecutionSetupException {
     List<RecordReader> readers = Lists.newArrayList();
     Table table = config.getTable();
     List<InputSplit> splits = config.getInputSplits();

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
index c4597b5..3a8a496 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
@@ -40,7 +40,7 @@ public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> {
       .getLogger(MongoScanBatchCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, MongoSubScan subScan,
+  public ScanBatch getBatch(FragmentContext context, MongoSubScan subScan,
       List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     List<RecordReader> readers = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index d22651e..22fcb8e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -118,10 +118,15 @@ public class TopLevelAllocator implements BufferAllocator {
   }
 
   @Override
-  public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation, long maximumReservation, boolean applyFragmentLimit) throws OutOfMemoryException {
+  public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation, long maximumReservation,
+      boolean applyFragmentLimit) {
     if(!acct.reserve(initialReservation)){
       logger.debug(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, acct.getCapacity() - acct.getAllocation()));
-      throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, acct.getCapacity() - acct.getAllocation()));
+      throw new OutOfMemoryRuntimeException(
+          String
+              .format(
+                  "You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.",
+                  initialReservation, acct.getCapacity() - acct.getAllocation()));
     };
     logger.debug("New child allocator with initial reservation {}", initialReservation);
     ChildAllocator allocator = new ChildAllocator(context, acct, maximumReservation, initialReservation, childrenMap, applyFragmentLimit);
@@ -191,7 +196,7 @@ public class TopLevelAllocator implements BufferAllocator {
                           long pre,
                           Map<ChildAllocator,
                           StackTraceElement[]> map,
-                          boolean applyFragmentLimit) throws OutOfMemoryException{
+        boolean applyFragmentLimit) {
       assert max >= pre;
       this.applyFragmentLimit=applyFragmentLimit;
       DrillConfig drillConf = context != null ? context.getConfig() : null;
@@ -240,10 +245,14 @@ public class TopLevelAllocator implements BufferAllocator {
     }
 
     @Override
-    public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation, long maximumReservation, boolean applyFragmentLimit)
-        throws OutOfMemoryException {
+    public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation, long maximumReservation,
+        boolean applyFragmentLimit) {
       if (!childAcct.reserve(initialReservation)) {
-        throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, childAcct.getAvailable()));
+        throw new OutOfMemoryRuntimeException(
+            String
+                .format(
+                    "You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.",
+                    initialReservation, childAcct.getAvailable()));
       };
       logger.debug("New child allocator with initial reservation {}", initialReservation);
       ChildAllocator newChildAllocator = new ChildAllocator(context, childAcct, maximumReservation, initialReservation, null, applyFragmentLimit);

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index e6d5acd..09a7568 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -27,7 +27,6 @@ import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.jdbc.SimpleCalciteSchema;
 
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -36,6 +35,7 @@ import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -57,6 +57,7 @@ import org.apache.drill.exec.work.batch.IncomingBuffers;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
@@ -66,6 +67,8 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
 
   private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
+  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
+
   private final DrillbitContext context;
   private final UserClientConnection connection; // is null if this context is for non-root fragment
   private final QueryContext queryContext; // is null if this context is for non-root fragment
@@ -145,8 +148,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     // Add the fragment context to the root allocator.
     // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
     try {
-      allocator = context.getAllocator().getChildAllocator(
-          this, fragment.getMemInitial(), fragment.getMemMax(), true);
+      allocator = context.getAllocator().getChildAllocator(this, fragment.getMemInitial(), fragment.getMemMax(), true);
       Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
     } catch(final Throwable e) {
       throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
@@ -314,6 +316,20 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return buffers;
   }
 
+  public OperatorContext newOperatorContext(PhysicalOperator popConfig, OperatorStats stats, boolean applyFragmentLimit)
+      throws OutOfMemoryException {
+    OperatorContextImpl context = new OperatorContextImpl(popConfig, this, stats, applyFragmentLimit);
+    contexts.add(context);
+    return context;
+  }
+
+  public OperatorContext newOperatorContext(PhysicalOperator popConfig, boolean applyFragmentLimit)
+      throws OutOfMemoryException {
+    OperatorContextImpl context = new OperatorContextImpl(popConfig, this, applyFragmentLimit);
+    contexts.add(context);
+    return context;
+  }
+
   @VisibleForTesting
   @Deprecated
   public Throwable getFailureCause() {
@@ -359,6 +375,12 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
   @Override
   public void close() {
     waitForSendComplete();
+
+    // close operator context
+    for (OperatorContextImpl opContext : contexts) {
+      suppressingClose(opContext);
+    }
+
     suppressingClose(bufferManager);
     suppressingClose(buffers);
     suppressingClose(allocator);

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index ccafa67..7cc52ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -21,57 +21,20 @@ import io.netty.buffer.DrillBuf;
 
 import java.util.Iterator;
 
-import org.apache.drill.common.util.Hook.Closeable;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 
-import com.carrotsearch.hppc.LongObjectOpenHashMap;
+public abstract class OperatorContext {
 
-public class OperatorContext implements Closeable {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContext.class);
+  public abstract DrillBuf replace(DrillBuf old, int newSize);
 
-  private final BufferAllocator allocator;
-  private boolean closed = false;
-  private PhysicalOperator popConfig;
-  private OperatorStats stats;
-  private LongObjectOpenHashMap<DrillBuf> managedBuffers = new LongObjectOpenHashMap<>();
-  private final boolean applyFragmentLimit;
+  public abstract DrillBuf getManagedBuffer();
 
-  public OperatorContext(PhysicalOperator popConfig, FragmentContext context, boolean applyFragmentLimit) throws OutOfMemoryException {
-    this.applyFragmentLimit=applyFragmentLimit;
-    this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit);
-    this.popConfig = popConfig;
+  public abstract DrillBuf getManagedBuffer(int size);
 
-    OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig));
-    this.stats = context.getStats().getOperatorStats(def, allocator);
-  }
-
-  public OperatorContext(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats, boolean applyFragmentLimit) throws OutOfMemoryException {
-    this.applyFragmentLimit=applyFragmentLimit;
-    this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit);
-    this.popConfig = popConfig;
-    this.stats     = stats;
-  }
-
-  public DrillBuf replace(DrillBuf old, int newSize) {
-    if (managedBuffers.remove(old.memoryAddress()) == null) {
-      throw new IllegalStateException("Tried to remove unmanaged buffer.");
-    }
-    old.release();
-    return getManagedBuffer(newSize);
-  }
+  public abstract BufferAllocator getAllocator();
 
-  public DrillBuf getManagedBuffer() {
-    return getManagedBuffer(256);
-  }
-
-  public DrillBuf getManagedBuffer(int size) {
-    DrillBuf newBuf = allocator.buffer(size);
-    managedBuffers.put(newBuf.memoryAddress(), newBuf);
-    newBuf.setOperatorContext(this);
-    return newBuf;
-  }
+  public abstract OperatorStats getStats();
 
   public static int getChildCount(PhysicalOperator popConfig) {
     Iterator<PhysicalOperator> iter = popConfig.iterator();
@@ -87,41 +50,4 @@ public class OperatorContext implements Closeable {
     return i;
   }
 
-  public BufferAllocator getAllocator() {
-    if (allocator == null) {
-      throw new UnsupportedOperationException("Operator context does not have an allocator");
-    }
-    return allocator;
-  }
-
-  public boolean isClosed() {
-    return closed;
-  }
-
-  @Override
-  public void close() {
-    if (closed) {
-      logger.debug("Attempted to close Operator context for {}, but context is already closed", popConfig != null ? popConfig.getClass().getName() : null);
-      return;
-    }
-    logger.debug("Closing context for {}", popConfig != null ? popConfig.getClass().getName() : null);
-
-    // release managed buffers.
-    Object[] buffers = ((LongObjectOpenHashMap<Object>)(Object)managedBuffers).values;
-    for (int i =0; i < buffers.length; i++) {
-      if (managedBuffers.allocated[i]) {
-        ((DrillBuf)buffers[i]).release();
-      }
-    }
-
-    if (allocator != null) {
-      allocator.close();
-    }
-    closed = true;
-  }
-
-  public OperatorStats getStats() {
-    return stats;
-  }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
new file mode 100644
index 0000000..6dbd880
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import io.netty.buffer.DrillBuf;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+
+import com.carrotsearch.hppc.LongObjectOpenHashMap;
+
+class OperatorContextImpl extends OperatorContext implements AutoCloseable {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContextImpl.class);
+
+  private final BufferAllocator allocator;
+  private boolean closed = false;
+  private PhysicalOperator popConfig;
+  private OperatorStats stats;
+  private LongObjectOpenHashMap<DrillBuf> managedBuffers = new LongObjectOpenHashMap<>();
+  private final boolean applyFragmentLimit;
+
+  public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, boolean applyFragmentLimit) throws OutOfMemoryException {
+    this.applyFragmentLimit=applyFragmentLimit;
+    this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit);
+    this.popConfig = popConfig;
+
+    OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig));
+    this.stats = context.getStats().getOperatorStats(def, allocator);
+  }
+
+  public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats, boolean applyFragmentLimit) throws OutOfMemoryException {
+    this.applyFragmentLimit=applyFragmentLimit;
+    this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit);
+    this.popConfig = popConfig;
+    this.stats     = stats;
+  }
+
+  public DrillBuf replace(DrillBuf old, int newSize) {
+    if (managedBuffers.remove(old.memoryAddress()) == null) {
+      throw new IllegalStateException("Tried to remove unmanaged buffer.");
+    }
+    old.release();
+    return getManagedBuffer(newSize);
+  }
+
+  public DrillBuf getManagedBuffer() {
+    return getManagedBuffer(256);
+  }
+
+  public DrillBuf getManagedBuffer(int size) {
+    DrillBuf newBuf = allocator.buffer(size);
+    managedBuffers.put(newBuf.memoryAddress(), newBuf);
+    newBuf.setOperatorContext(this);
+    return newBuf;
+  }
+
+  public BufferAllocator getAllocator() {
+    if (allocator == null) {
+      throw new UnsupportedOperationException("Operator context does not have an allocator");
+    }
+    return allocator;
+  }
+
+  public boolean isClosed() {
+    return closed;
+  }
+
+  @Override
+  public void close() {
+    if (closed) {
+      logger.debug("Attempted to close Operator context for {}, but context is already closed", popConfig != null ? popConfig.getClass().getName() : null);
+      return;
+    }
+    logger.debug("Closing context for {}", popConfig != null ? popConfig.getClass().getName() : null);
+
+    // release managed buffers.
+    Object[] buffers = ((LongObjectOpenHashMap<Object>)(Object)managedBuffers).values;
+    for (int i =0; i < buffers.length; i++) {
+      if (managedBuffers.allocated[i]) {
+        ((DrillBuf)buffers[i]).release();
+      }
+    }
+
+    if (allocator != null) {
+      allocator.close();
+    }
+    closed = true;
+  }
+
+  public OperatorStats getStats() {
+    return stats;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index 628dcd3..accce43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.physical.impl;
 
+import java.util.List;
+
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OpProfileDef;
@@ -24,6 +26,7 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 
@@ -33,9 +36,10 @@ public abstract class BaseRootExec implements RootExec {
   protected OperatorStats stats = null;
   protected OperatorContext oContext = null;
   protected FragmentContext fragmentContext = null;
+  private List<CloseableRecordBatch> operators;
 
   public BaseRootExec(final FragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException {
-    this.oContext = new OperatorContext(config, fragmentContext, stats, true);
+    this.oContext = fragmentContext.newOperatorContext(config, stats, true);
     stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
         config.getOperatorType(), OperatorContext.getChildCount(config)),
         oContext.getAllocator());
@@ -43,15 +47,20 @@ public abstract class BaseRootExec implements RootExec {
     this.fragmentContext = fragmentContext;
   }
 
-  public BaseRootExec(final FragmentContext fragmentContext, final OperatorContext oContext, final PhysicalOperator config) throws OutOfMemoryException {
+  public BaseRootExec(final FragmentContext fragmentContext, final OperatorContext oContext,
+      final PhysicalOperator config) throws OutOfMemoryException {
     this.oContext = oContext;
     stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
-      config.getOperatorType(), OperatorContext.getChildCount(config)),
+        config.getOperatorType(), OperatorContext.getChildCount(config)),
       oContext.getAllocator());
     fragmentContext.getStats().addOperatorStats(this.stats);
     this.fragmentContext = fragmentContext;
   }
 
+  void setOperators(List<CloseableRecordBatch> operators) {
+    this.operators = operators;
+  }
+
   @Override
   public final boolean next() {
     // Stats should have been initialized
@@ -95,7 +104,7 @@ public abstract class BaseRootExec implements RootExec {
   }
 
   @Override
-  public void stop() {
+  public void close() throws Exception {
     // We want to account for the time spent waiting here as Wait time in the operator profile
     try {
       stats.startProcessing();
@@ -105,5 +114,16 @@ public abstract class BaseRootExec implements RootExec {
       stats.stopWait();
       stats.stopProcessing();
     }
+
+    // close all operators.
+    if (operators != null) {
+      for (CloseableRecordBatch b : operators) {
+        try {
+          b.close();
+        } catch (Exception e) {
+          fragmentContext.fail(e);
+        }
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
index 1cf7da7..af99b5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
@@ -20,13 +20,14 @@ package org.apache.drill.exec.physical.impl;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 
 public interface BatchCreator<T extends PhysicalOperator> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchCreator.class);
 
-  public RecordBatch getBatch(FragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException;
+  public CloseableRecordBatch getBatch(FragmentContext context, T config, List<RecordBatch> children)
+      throws ExecutionSetupException;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 912dfd7..5cea748 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -27,14 +28,15 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.impl.validate.IteratorValidatorInjector;
+import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.util.AssertionUtil;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.security.UserGroupInformation;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
-import org.apache.drill.exec.util.ImpersonationUtil;
-import org.apache.hadoop.security.UserGroupInformation;
 
 /**
  * Create RecordBatch tree (PhysicalOperator implementations) for a given PhysicalOperator tree.
@@ -42,15 +44,23 @@ import org.apache.hadoop.security.UserGroupInformation;
 public class ImplCreator {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImplCreator.class);
 
-  private static final ImplCreator INSTANCE = new ImplCreator();
+  private RootExec root = null;
+  private LinkedList<CloseableRecordBatch> operators = Lists.newLinkedList();
 
   private ImplCreator() {}
 
+  private List<CloseableRecordBatch> getOperators() {
+    return operators;
+  }
+
   /**
    * Create and return fragment RootExec for given FragmentRoot. RootExec has one or more RecordBatches as children
    * (which may contain child RecordBatches and so on).
-   * @param context FragmentContext.
-   * @param root FragmentRoot.
+   *
+   * @param context
+   *          FragmentContext.
+   * @param root
+   *          FragmentRoot.
    * @return RootExec of fragment.
    * @throws ExecutionSetupException
    */
@@ -61,10 +71,16 @@ public class ImplCreator {
     if (AssertionUtil.isAssertionsEnabled()) {
       root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root);
     }
-
+    final ImplCreator creator = new ImplCreator();
     Stopwatch watch = new Stopwatch();
     watch.start();
-    final RootExec rootExec = INSTANCE.getRootExec(root, context);
+    final RootExec rootExec = creator.getRootExec(root, context);
+
+    // skip over this for SimpleRootExec (testing)
+    if (rootExec instanceof BaseRootExec) {
+      ((BaseRootExec) rootExec).setOperators(creator.getOperators());
+    }
+
     logger.debug("Took {} ms to create RecordBatch tree", watch.elapsed(TimeUnit.MILLISECONDS));
     if (rootExec == null) {
       throw new ExecutionSetupException(
@@ -72,6 +88,7 @@ public class ImplCreator {
     }
 
     return rootExec;
+
   }
 
   /** Create RootExec and its children (RecordBatches) for given FragmentRoot */
@@ -96,6 +113,7 @@ public class ImplCreator {
     }
   }
 
+
   /** Create a RecordBatch and its children for given PhysicalOperator */
   private RecordBatch getRecordBatch(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
     Preconditions.checkNotNull(op);
@@ -107,7 +125,10 @@ public class ImplCreator {
       try {
         return proxyUgi.doAs(new PrivilegedExceptionAction<RecordBatch>() {
           public RecordBatch run() throws Exception {
-            return ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(context, op, childRecordBatches);
+            final CloseableRecordBatch batch = ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(
+                context, op, childRecordBatches);
+            operators.addFirst(batch);
+            return batch;
           }
         });
       } catch (InterruptedException | IOException e) {
@@ -116,7 +137,10 @@ public class ImplCreator {
         throw new ExecutionSetupException(errMsg, e);
       }
     } else {
-      return ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(context, op, childRecordBatches);
+      final CloseableRecordBatch batch = ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(context,
+          op, childRecordBatches);
+      operators.addFirst(batch);
+      return batch;
     }
   }
 
@@ -141,4 +165,5 @@ public class ImplCreator {
 
     return children;
   }
-}
\ No newline at end of file
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
index daef44c..1bf6c01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
@@ -32,7 +32,7 @@ public class MergingReceiverCreator implements BatchCreator<MergingReceiverPOP>
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingReceiverCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context,
+  public MergingRecordBatch getBatch(FragmentContext context,
                               MergingReceiverPOP receiver,
                               List<RecordBatch> children)
       throws ExecutionSetupException, OutOfMemoryException {

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
index 8fd68b2..5e366fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
@@ -23,7 +23,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
  * A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange
  * output nodes and storage nodes.  They are there driving force behind the completion of a query.
  */
-public interface RootExec {
+public interface RootExec extends AutoCloseable {
   /**
    * Do the next batch of work.
    * @return Whether or not additional batches of work are necessary.  False means that this fragment is done.
@@ -31,11 +31,6 @@ public interface RootExec {
   public boolean next();
 
   /**
-   * Inform all children to clean up and go away.
-   */
-  public void stop();
-
-  /**
    * Inform sender that receiving fragment is finished and doesn't need any more data
    * @param handle
    */

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index ca2a048..6ea43cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -24,7 +24,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -38,8 +37,8 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -59,7 +58,7 @@ import com.google.common.collect.Maps;
 /**
  * Record batch used for a particular scan. Operators against one or more
  */
-public class ScanBatch implements RecordBatch {
+public class ScanBatch implements CloseableRecordBatch {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
 
   private static final int MAX_RECORD_CNT = Character.MAX_VALUE;
@@ -115,7 +114,7 @@ public class ScanBatch implements RecordBatch {
 
   public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers) throws ExecutionSetupException {
     this(subScanConfig, context,
-        new OperatorContext(subScanConfig, context, false /* ScanBatch is not subject to fragment memory limit */),
+        context.newOperatorContext(subScanConfig, false /* ScanBatch is not subject to fragment memory limit */),
         readers, Collections.<String[]> emptyList(), Collections.<Integer> emptyList());
   }
 
@@ -343,7 +342,7 @@ public class ScanBatch implements RecordBatch {
     return WritableBatch.get(this);
   }
 
-  public void cleanup() {
+  public void close() {
     container.clear();
     if (tempContainer != null) {
       tempContainer.clear();
@@ -353,7 +352,6 @@ public class ScanBatch implements RecordBatch {
     }
     fieldVectorMap.clear();
     currentReader.cleanup();
-    oContext.close();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 2069d35..5b4d7bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -81,12 +81,10 @@ public class ScreenCreator implements RootCreator<Screen>{
       logger.trace("Screen Outcome {}", outcome);
       switch (outcome) {
       case STOP:
-        this.internalStop();
         return false;
       case NONE:
         if (firstBatch) {
           // this is the only data message sent to the client and may contain the schema
-          this.internalStop();
           QueryWritableBatch batch;
           QueryData header = QueryData.newBuilder() //
             .setQueryId(context.getHandle().getQueryId()) //
@@ -130,21 +128,6 @@ public class ScreenCreator implements RootCreator<Screen>{
       stats.addLongStat(Metric.BYTES_SENT, queryBatch.getByteCount());
     }
 
-
-    private void internalStop(){
-      oContext.close();
-      incoming.cleanup();
-    }
-
-    @Override
-    public void stop() {
-      super.stop();
-      if (!oContext.isClosed()) {
-        internalStop();
-      }
-      injector.injectPause(context.getExecutionControls(), "send-complete", logger);
-    }
-
     RecordBatch getIncoming() {
       return incoming;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 18ea71d..67062f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -24,7 +24,6 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.AccountingDataTunnel;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
-import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.BatchSchema;
@@ -64,7 +63,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     }
 
     public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException {
-      super(context, new OperatorContext(config, context, null, false), config);
+      super(context, context.newOperatorContext(config, null, false), config);
       this.incoming = batch;
       assert(incoming != null);
       this.handle = context.getHandle();
@@ -136,13 +135,6 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     }
 
     @Override
-    public void stop() {
-      super.stop();
-      oContext.close();
-      incoming.cleanup();
-    }
-
-    @Override
     public void receivingFragmentFinished(FragmentHandle handle) {
       done = true;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 681c3e3..9f6bea9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -117,15 +117,14 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
     if (sv4 != null) {
       sv4.clear();
     }
     if (priorityQueue != null) {
       priorityQueue.cleanup();
     }
-    super.cleanup();
-    incoming.cleanup();
+    super.close();
   }
 
   public void buildSchema() throws SchemaChangeException {
@@ -424,10 +423,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     }
 
     @Override
-    public void cleanup() {
-    }
-
-    @Override
     public Iterator<VectorWrapper<?>> iterator() {
       return container.iterator();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java
index aa8b611..e815bff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java
@@ -31,7 +31,8 @@ public class TopNSortBatchCreator implements BatchCreator<TopN>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopNSortBatchCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, TopN config, List<RecordBatch> children) throws ExecutionSetupException {
+  public TopNBatch getBatch(FragmentContext context, TopN config, List<RecordBatch> children)
+      throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new TopNBatch(config, context, children.iterator().next());
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index b419f71..15fb7b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -24,7 +24,6 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.Writer;
@@ -179,7 +178,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
     try {
       if (recordWriter != null) {
         recordWriter.cleanup();
@@ -188,8 +187,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
       logger.error("Failure while closing record writer", ex);
       throw new RuntimeException("Failed to close RecordWriter", ex);
     }
-    super.cleanup();
-    incoming.cleanup();
+    super.close();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index c29fbf2..b753574 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -30,7 +30,6 @@ import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.ClassGenerator.BlockType;
 import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
@@ -135,7 +134,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
         return aggregator.getOutcome();
       case UPDATE_AGGREGATOR:
         context.fail(new SchemaChangeException("Hash aggregate does not support schema changes"));
-        cleanup();
+        close();
         killIncoming(false);
         return IterOutcome.STOP;
       default:
@@ -273,12 +272,11 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
     if (aggregator != null) {
       aggregator.cleanup();
     }
-    super.cleanup();
-    incoming.cleanup();
+    super.close();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java
index 8c60541..1397342 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java
@@ -31,7 +31,8 @@ public class HashAggBatchCreator implements BatchCreator<HashAggregate>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggBatchCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, HashAggregate config, List<RecordBatch> children) throws ExecutionSetupException {
+  public HashAggBatch getBatch(FragmentContext context, HashAggregate config, List<RecordBatch> children)
+      throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new HashAggBatch(config, children.iterator().next(), context);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 87cd4d6..1b90dd8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -30,7 +30,6 @@ import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.compile.sig.RuntimeOverridden;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
@@ -52,14 +51,12 @@ import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ObjectVector;
-import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 
-import com.google.common.collect.Lists;
-
 public abstract class HashAggTemplate implements HashAggregator {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class);
 
@@ -351,10 +348,6 @@ public abstract class HashAggTemplate implements HashAggregator {
 
                 outputCurrentBatch();
 
-                // cleanup incoming batch since output of aggregation does not need
-                // any references to the incoming
-
-                incoming.cleanup();
                 // return setOkAndReturn();
                 return AggOutcome.RETURN_OUTCOME;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index ed5b415..c1c5cb9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -30,7 +30,6 @@ import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.ClassGenerator.BlockType;
 import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
@@ -177,7 +176,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
         return outcome;
       case UPDATE_AGGREGATOR:
         context.fail(new SchemaChangeException("Streaming aggregate does not support schema changes"));
-        cleanup();
+        close();
         killIncoming(false);
         return IterOutcome.STOP;
       default:
@@ -409,9 +408,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   }
 
   @Override
-  public void cleanup() {
-    super.cleanup();
-    incoming.cleanup();
+  public void close() {
+    super.close();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java
index 0203b81..cac5b06 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java
@@ -31,7 +31,8 @@ public class StreamingAggBatchCreator implements BatchCreator<StreamingAggregate
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggBatchCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, StreamingAggregate config, List<RecordBatch> children) throws ExecutionSetupException {
+  public StreamingAggBatch getBatch(FragmentContext context, StreamingAggregate config, List<RecordBatch> children)
+      throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new StreamingAggBatch(config, children.iterator().next(), context);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index b3a6a8f..d2282c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -23,7 +23,6 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.AccountingDataTunnel;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
-import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.config.BroadcastSender;
 import org.apache.drill.exec.physical.impl.BaseRootExec;
@@ -62,7 +61,7 @@ public class BroadcastSenderRootExec extends BaseRootExec {
   public BroadcastSenderRootExec(FragmentContext context,
                                  RecordBatch incoming,
                                  BroadcastSender config) throws OutOfMemoryException {
-    super(context, new OperatorContext(config, context, null, false), config);
+    super(context, context.newOperatorContext(config, null, false), config);
     this.ok = true;
     this.context = context;
     this.incoming = incoming;
@@ -153,10 +152,4 @@ public class BroadcastSenderRootExec extends BaseRootExec {
     stats.addLongStat(Metric.BYTES_SENT, writableBatch.getByteCount());
   }
 
-  @Override
-  public void stop() {
-    super.stop();
-    oContext.close();
-    incoming.cleanup();
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
index 7f2fe8e..e9b3051 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
@@ -31,7 +31,8 @@ public class FilterBatchCreator implements BatchCreator<Filter>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterBatchCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, Filter config, List<RecordBatch> children) throws ExecutionSetupException {
+  public FilterRecordBatch getBatch(FragmentContext context, Filter config, List<RecordBatch> children)
+      throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new FilterRecordBatch(config, children.iterator().next(), context);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index 064d5c8..5eee9df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -86,14 +86,14 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
 
 
   @Override
-  public void cleanup() {
+  public void close() {
     if (sv2 != null) {
       sv2.clear();
     }
     if (sv4 != null) {
       sv4.clear();
     }
-    super.cleanup();
+    super.close();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java
index 6f02824..94203d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java
@@ -22,7 +22,6 @@ import java.util.List;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.FlattenPOP;
-import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
 
@@ -32,7 +31,8 @@ public class FlattenBatchCreator implements BatchCreator<FlattenPOP>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenBatchCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, FlattenPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+  public FlattenRecordBatch getBatch(FragmentContext context, FlattenPOP config, List<RecordBatch> children)
+      throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new FlattenRecordBatch(config, children.iterator().next(), context);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 79fe177..dd53477 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -516,7 +516,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
     if (hjHelper != null) {
       hjHelper.clear();
     }
@@ -529,9 +529,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     if (hashTable != null) {
       hashTable.clear();
     }
-    super.cleanup();
-    right.cleanup();
-    left.cleanup();
+    super.close();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
index bfe89c0..1402769 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
@@ -30,7 +30,8 @@ import com.google.common.base.Preconditions;
 public class HashJoinBatchCreator implements BatchCreator<HashJoinPOP> {
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, HashJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+  public HashJoinBatch getBatch(FragmentContext context, HashJoinPOP config, List<RecordBatch> children)
+      throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 2);
     return new HashJoinBatch(config, context, children.get(0), children.get(1));
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 1a7e60e..6466f70 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -38,7 +38,6 @@ import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -256,14 +255,6 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     right.kill(sendUpstream);
   }
 
-  @Override
-  public void cleanup() {
-      super.cleanup();
-
-      left.cleanup();
-      right.cleanup();
-  }
-
   private void generateDoCompareNextLeft(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch,
       LogicalExpression[] leftExpression, JVar incomingLeftRecordBatch, JVar joinStatus,
       ErrorCollector collector) throws ClassTransformationException {

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
index 7d100af..24f5533 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
@@ -32,7 +32,8 @@ public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, MergeJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+  public MergeJoinBatch getBatch(FragmentContext context, MergeJoinPOP config, List<RecordBatch> children)
+      throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 2);
     if(config.getJoinType() == JoinRelType.RIGHT){
       return new MergeJoinBatch(config.flipIfRight(), context, children.get(1), children.get(0));

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 4fb1409..d20bfa1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -17,9 +17,9 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
-import com.sun.codemodel.JExpr;
-import com.sun.codemodel.JExpression;
-import com.sun.codemodel.JVar;
+import java.io.IOException;
+import java.util.LinkedList;
+
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
@@ -40,12 +40,11 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.complex.AbstractContainerVector;
-import com.google.common.base.Preconditions;
 
-import java.io.IOException;
-import java.util.LinkedList;
+import com.google.common.base.Preconditions;
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JVar;
 
 /*
  * RecordBatch implementation for the nested loop join operator
@@ -309,12 +308,10 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
     rightContainer.clear();
     rightCounts.clear();
-    super.cleanup();
-    right.cleanup();
-    left.cleanup();
+    super.close();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java
index 12588ac..2e708a6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java
@@ -17,17 +17,18 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
+import java.util.List;
+
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.NestedLoopJoinPOP;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
 
-import java.util.List;
-
 public class NestedLoopJoinBatchCreator implements BatchCreator<NestedLoopJoinPOP> {
   @Override
-  public RecordBatch getBatch(FragmentContext context, NestedLoopJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+  public NestedLoopJoinBatch getBatch(FragmentContext context, NestedLoopJoinPOP config, List<RecordBatch> children)
+      throws ExecutionSetupException {
     return new NestedLoopJoinBatch(config, context, children.get(0), children.get(1));
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
index e71daba..f954e72 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
@@ -29,7 +29,8 @@ import com.google.common.collect.Iterables;
 
 public class LimitBatchCreator implements BatchCreator<Limit> {
   @Override
-  public RecordBatch getBatch(FragmentContext context, Limit config, List<RecordBatch> children) throws ExecutionSetupException {
+  public LimitRecordBatch getBatch(FragmentContext context, Limit config, List<RecordBatch> children)
+      throws ExecutionSetupException {
     return new LimitRecordBatch(config, context, Iterables.getOnlyElement(children));
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 7e66893..eff9e61 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -194,9 +194,9 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   }
 
   @Override
-  public void cleanup(){
+  public void close(){
     outgoingSv.clear();
-    super.cleanup();
+    super.close();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 40cbc89..c36b0d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -44,7 +44,6 @@ import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
-import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.config.MergingReceiverPOP;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
@@ -130,7 +129,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   public MergingRecordBatch(final FragmentContext context,
                             final MergingReceiverPOP config,
                             final RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException {
-    super(config, context, true, new OperatorContext(config, context, false));
+    super(config, context, true, context.newOperatorContext(config, false));
     //super(config, context);
     this.fragProviders = fragProviders;
     this.context = context;
@@ -487,7 +486,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     if (sendUpstream) {
       informSenders();
     } else {
-      cleanup();
+      close();
       for (final RawFragmentBatchProvider provider : fragProviders) {
         provider.kill(context);
       }
@@ -697,7 +696,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
     outgoingContainer.clear();
     if (batchLoaders != null) {
       for (final RecordBatchLoader rbl : batchLoaders) {
@@ -706,7 +705,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
         }
       }
     }
-    oContext.close();
     if (fragProviders != null) {
       for (final RawFragmentBatchProvider f : fragProviders) {
         f.cleanup();

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index b26c78a..63b7eba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -158,9 +158,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
 
 
   @Override
-  public void cleanup() {
-    incoming.cleanup();
-    super.cleanup();
+  public void close() {
+    super.close();
     this.partitionVectors.clear();
     this.partitionKeyVector.clear();
   }
@@ -302,9 +301,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
       for (VectorWrapper<?> w : finalTable.get()) {
         partitionVectors.add(w.getValueVector());
       }
+
     } catch (ClassTransformationException | IOException | SchemaChangeException | InterruptedException ex) {
       kill(false);
-      logger.error("Failure while building final partition table.", ex);
       context.fail(ex);
       return false;
       // TODO InterruptedException
@@ -467,7 +466,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
     // If this is the first iteration, we need to generate the partition vectors before we can proceed
     if (this.first && upstream == IterOutcome.OK_NEW_SCHEMA) {
       if (!getPartitionVectors()) {
-        cleanup();
+        close();
         return IterOutcome.STOP;
       }
 
@@ -503,7 +502,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
     case NONE:
     case NOT_YET:
     case STOP:
-      cleanup();
+      close();
       recordCount = 0;
       return upstream;
     case OK_NEW_SCHEMA:

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 7e3f4b2..cf7ba16 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -35,7 +35,6 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.AccountingDataTunnel;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
-import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
@@ -101,7 +100,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
   public PartitionSenderRootExec(FragmentContext context,
                                  RecordBatch incoming,
                                  HashPartitionSender operator) throws OutOfMemoryException {
-    super(context, new OperatorContext(operator, context, null, false), operator);
+    super(context, context.newOperatorContext(operator, null, false), operator);
     this.incoming = incoming;
     this.operator = operator;
     this.context = context;
@@ -141,8 +140,6 @@ public class PartitionSenderRootExec extends BaseRootExec {
   public boolean innerNext() {
 
     if (!ok) {
-      stop();
-
       return false;
     }
 
@@ -322,17 +319,15 @@ public class PartitionSenderRootExec extends BaseRootExec {
     }
   }
 
-  public void stop() {
+  public void close() throws Exception {
     logger.debug("Partition sender stopping.");
-    super.stop();
+    super.close();
     ok = false;
     if (partitioner != null) {
       updateAggregateStats();
       partitioner.clear();
     }
 
-    oContext.close();
-    incoming.cleanup();
   }
 
   public void sendEmptyBatch(boolean isLast) {

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index c2d6166..35bf3cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -187,7 +187,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
     stop = true;
     try {
       cleanUpLatch.await();
@@ -195,9 +195,8 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
       logger.warn("Interrupted while waiting for producer to clean up first. I will try to clean up now...", e);
       // TODO InterruptedException
     } finally {
-      super.cleanup();
+      super.close();
       clearQueue();
-      incoming.cleanup();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/88bb0519/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java
index c568ed4..6542576 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java
@@ -29,7 +29,8 @@ import com.google.common.collect.Iterables;
 
 public class ProducerConsumerBatchCreator implements BatchCreator<ProducerConsumer> {
   @Override
-  public RecordBatch getBatch(FragmentContext context, ProducerConsumer config, List<RecordBatch> children) throws ExecutionSetupException {
+  public ProducerConsumerBatch getBatch(FragmentContext context, ProducerConsumer config, List<RecordBatch> children)
+      throws ExecutionSetupException {
     return new ProducerConsumerBatch(config, context, Iterables.getOnlyElement(children));
   }
 }