You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ag...@apache.org on 2020/02/21 12:03:33 UTC

[drill] 04/10: DRILL-7583: Remove STOP status from operator outcome

This is an automated email from the ASF dual-hosted git repository.

agozhiy pushed a commit to branch MERGE-200221-00
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 5015d0e9b70092af0b9cb475ec3e748583c4c897
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Thu Feb 13 10:17:00 2020 -0800

    DRILL-7583: Remove STOP status from operator outcome
    
    Now that all operators have been converted to throw
    exceptions on error condistions, the STOP status is
    unused. This patch removes the STOP status and the
    related kill() and killIncoming() methods. The
    "kill" methods are replaced by "cancel" methods which
    handle "normal" case cancellation, such as for
    LIMIT.
    
    closes #1981
---
 .../physical/PhysicalOperatorSetupException.java   |    8 +-
 .../apache/drill/exec/physical/impl/ScanBatch.java |   17 +-
 .../drill/exec/physical/impl/ScreenCreator.java    |    2 -
 .../exec/physical/impl/SingleSenderCreator.java    |    9 +-
 .../physical/impl/StatisticsWriterRecordBatch.java |    7 +-
 .../drill/exec/physical/impl/TopN/TopNBatch.java   |    9 +-
 .../exec/physical/impl/WriterRecordBatch.java      |    7 +-
 .../exec/physical/impl/aggregate/HashAggBatch.java |  131 ++-
 .../physical/impl/aggregate/HashAggTemplate.java   |  232 +++--
 .../impl/aggregate/SpilledRecordBatch.java         |   10 +-
 .../physical/impl/aggregate/StreamingAggBatch.java |    9 +-
 .../impl/aggregate/StreamingAggTemplate.java       |    2 -
 .../broadcastsender/BroadcastSenderRootExec.java   |    1 -
 .../physical/impl/filter/EvalSetupException.java   |    2 +-
 .../physical/impl/filter/EvaluationPredicate.java  |   10 +-
 .../physical/impl/flatten/FlattenRecordBatch.java  |    4 +-
 .../physical/impl/flatten/FlattenTemplate.java     |    8 +-
 .../exec/physical/impl/join/HashJoinBatch.java     | 1014 ++++++++++++--------
 .../impl/join/HashJoinMemoryCalculatorImpl.java    |   20 +-
 .../physical/impl/join/HashJoinProbeTemplate.java  |    3 +-
 .../impl/join/HashJoinStateCalculator.java         |    2 +-
 .../drill/exec/physical/impl/join/JoinStatus.java  |    5 -
 .../exec/physical/impl/join/LateralJoinBatch.java  |   20 +-
 .../exec/physical/impl/join/MergeJoinBatch.java    |   12 +-
 .../physical/impl/join/NestedLoopJoinBatch.java    |   10 +-
 .../exec/physical/impl/join/RowKeyJoinBatch.java   |   13 +-
 .../exec/physical/impl/limit/LimitRecordBatch.java |   42 +-
 .../impl/mergereceiver/MergingRecordBatch.java     |   12 +-
 .../impl/metadata/MetadataControllerBatch.java     |    8 -
 .../impl/metadata/MetadataHandlerBatch.java        |    4 +-
 .../OrderedPartitionProjectorTemplate.java         |   10 +-
 .../OrderedPartitionRecordBatch.java               |    7 +-
 .../orderedpartitioner/SampleCopierTemplate.java   |   11 +-
 .../partitionsender/PartitionSenderRootExec.java   |   30 +-
 .../impl/producer/ProducerConsumerBatch.java       |   30 +-
 .../physical/impl/project/ProjectRecordBatch.java  |    4 +-
 .../impl/protocol/OperatorRecordBatch.java         |    7 +-
 .../RangePartitionRecordBatch.java                 |    5 -
 .../exec/physical/impl/sort/RecordBatchData.java   |    4 +-
 .../drill/exec/physical/impl/sort/SortBatch.java   |    6 +-
 .../impl/statistics/StatisticsMergeBatch.java      |    1 -
 .../physical/impl/union/UnionAllRecordBatch.java   |    9 -
 .../physical/impl/unnest/UnnestRecordBatch.java    |    8 +-
 .../unorderedreceiver/UnorderedReceiverBatch.java  |   11 +-
 .../impl/unpivot/UnpivotMapsRecordBatch.java       |    1 -
 .../validate/IteratorValidatorBatchIterator.java   |   20 +-
 .../impl/window/WindowFrameRecordBatch.java        |   27 +-
 .../physical/impl/xsort/ExternalSortBatch.java     |   13 +-
 .../exec/physical/impl/xsort/MergeSortWrapper.java |    2 +-
 .../exec/physical/resultSet/impl/TupleState.java   |    2 +
 .../exec/planner/physical/PlannerSettings.java     |   24 +-
 .../exec/record/AbstractBinaryRecordBatch.java     |   11 +-
 .../drill/exec/record/AbstractRecordBatch.java     |   17 +-
 .../exec/record/AbstractUnaryRecordBatch.java      |    6 +-
 .../apache/drill/exec/record/MajorTypeSerDe.java   |    2 +
 .../org/apache/drill/exec/record/RecordBatch.java  |   52 +-
 .../exec/record/RecordBatchMemoryManager.java      |   10 +-
 .../apache/drill/exec/record/RecordBatchSizer.java |   13 +-
 .../apache/drill/exec/record/RecordIterator.java   |    1 -
 .../apache/drill/exec/record/SchemalessBatch.java  |   17 +-
 .../drill/exec/record/SimpleRecordBatch.java       |    8 +-
 .../exec/record/VectorAccessibleComplexWriter.java |    1 -
 .../java/org/apache/drill/TestCorrelation.java     |    1 -
 .../java/org/apache/drill/TestExampleQueries.java  |  128 ++-
 .../test/java/org/apache/drill/TestTpchLimit0.java |    1 -
 .../drill/exec/DrillSeparatePlanningTest.java      |    2 +-
 .../drill/exec/physical/impl/MockRecordBatch.java  |   15 +-
 .../drill/exec/physical/impl/SimpleRootExec.java   |    1 -
 .../exec/physical/impl/TestBroadcastExchange.java  |    1 +
 .../physical/impl/join/TestHashJoinOutcome.java    |   14 +-
 .../impl/join/TestLateralJoinCorrectness.java      |    2 +-
 .../impl/limit/TestLimitBatchEmitOutcome.java      |   54 +-
 .../impl/protocol/TestOperatorRecordBatch.java     |    9 +-
 .../physical/impl/unnest/MockLateralJoinBatch.java |   30 +-
 .../impl/unnest/TestUnnestCorrectness.java         |    5 +-
 .../unnest/TestUnnestWithLateralCorrectness.java   |    5 +-
 .../drill/exec/work/filter/BloomFilterTest.java    |   10 +-
 .../apache/drill/test/PhysicalOpUnitTestBase.java  |    3 +-
 78 files changed, 1109 insertions(+), 1175 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalOperatorSetupException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalOperatorSetupException.java
index 66acec0..0c2e06c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalOperatorSetupException.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalOperatorSetupException.java
@@ -20,12 +20,10 @@ package org.apache.drill.exec.physical;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 
 
+@SuppressWarnings("serial")
 public class PhysicalOperatorSetupException extends ExecutionSetupException {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalOperatorSetupException.class);
 
-  public PhysicalOperatorSetupException() {
-    super();
-  }
+  public PhysicalOperatorSetupException() { }
 
   public PhysicalOperatorSetupException(String message, Throwable cause, boolean enableSuppression,
       boolean writableStackTrace) {
@@ -43,6 +41,4 @@ public class PhysicalOperatorSetupException extends ExecutionSetupException {
   public PhysicalOperatorSetupException(Throwable cause) {
     super(cause);
   }
-
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index d314794..e159ec6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -174,12 +174,9 @@ public class ScanBatch implements CloseableRecordBatch {
   }
 
   @Override
-  public void kill(boolean sendUpstream) {
-    if (sendUpstream) {
-      done = true;
-    } else {
-      releaseAssets();
-    }
+  public void cancel() {
+    done = true;
+    releaseAssets();
   }
 
   /**
@@ -285,13 +282,10 @@ public class ScanBatch implements CloseableRecordBatch {
       return internalNext();
     } catch (OutOfMemoryException ex) {
       clearFieldVectorMap();
-      lastOutcome = IterOutcome.STOP;
       throw UserException.memoryError(ex).build(logger);
     } catch (UserException ex) {
-      lastOutcome = IterOutcome.STOP;
       throw ex;
     } catch (Exception ex) {
-      lastOutcome = IterOutcome.STOP;
       throw UserException.internalError(ex).build(logger);
     } finally {
       oContext.getStats().stopProcessing();
@@ -630,11 +624,6 @@ public class ScanBatch implements CloseableRecordBatch {
   }
 
   @Override
-  public boolean hasFailed() {
-    return lastOutcome == IterOutcome.STOP;
-  }
-
-  @Override
   public void dump() {
     logger.error("ScanBatch[container={}, currentReader={}, schema={}]", container, currentReader, schema);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 765e1de..0f7ca13 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -83,8 +83,6 @@ public class ScreenCreator implements RootCreator<Screen> {
       IterOutcome outcome = next(incoming);
       logger.trace("Screen Outcome {}", outcome);
       switch (outcome) {
-        case STOP:
-          return false;
         case NONE:
           if (firstBatch) {
             // this is the only data message sent to the client and may contain the schema
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index d14d09a..c23d968 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -53,7 +53,6 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     private AccountingDataTunnel tunnel;
     private final FragmentHandle handle;
     private final int recMajor;
-    private volatile boolean ok = true;
     private volatile boolean done = false;
 
     public enum Metric implements MetricDef {
@@ -82,21 +81,15 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
 
     @Override
     public boolean innerNext() {
-      if (!ok) {
-        incoming.kill(false);
-
-        return false;
-      }
 
       IterOutcome out;
       if (!done) {
         out = next(incoming);
       } else {
-        incoming.kill(true);
+        incoming.cancel();
         out = IterOutcome.NONE;
       }
       switch (out) {
-      case STOP:
       case NONE:
         // if we didn't do anything yet, send an empty schema.
         final BatchSchema sendSchema = incoming.getSchema() == null ?
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/StatisticsWriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/StatisticsWriterRecordBatch.java
index e54f76d..daa5e33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/StatisticsWriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/StatisticsWriterRecordBatch.java
@@ -69,8 +69,8 @@ public class StatisticsWriterRecordBatch extends AbstractRecordBatch<Writer> {
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
-    incoming.kill(sendUpstream);
+  protected void cancelIncoming() {
+    incoming.cancel();
   }
 
   @Override
@@ -97,9 +97,6 @@ public class StatisticsWriterRecordBatch extends AbstractRecordBatch<Writer> {
       upstream = next(incoming);
 
       switch(upstream) {
-        case STOP:
-          return upstream;
-
         case NOT_YET:
         case NONE:
           break;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index d600f12..e094d56 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -157,9 +157,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
         container.setRecordCount(0);
 
         return;
-      case STOP:
-        state = BatchState.STOP;
-        return;
       case NONE:
         state = BatchState.DONE;
         return;
@@ -223,8 +220,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
         break outer;
       case NOT_YET:
         throw new UnsupportedOperationException();
-      case STOP:
-        return lastKnownOutcome;
       case OK_NEW_SCHEMA:
         // only change in the case that the schema truly changes.  Artificial schema changes are ignored.
         // schema change handling in case when EMIT is also seen is same as without EMIT. i.e. only if union type
@@ -486,8 +481,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
-    incoming.kill(sendUpstream);
+  protected void cancelIncoming() {
+    incoming.cancel();
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 1b368d2..3c2ba08 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -70,8 +70,8 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
-    incoming.kill(sendUpstream);
+  protected void cancelIncoming() {
+    incoming.cancel();
   }
 
   @Override
@@ -93,9 +93,6 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
       upstream = next(incoming);
 
       switch(upstream) {
-        case STOP:
-          return upstream;
-
         case NOT_YET:
           break;
         case NONE:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 506b594..7171e24 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -212,10 +212,10 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val;
     final AggPrelBase.OperatorPhase phase = popConfig.getAggPhase();
 
-    if ( phase.is2nd() && !fallbackEnabled ) {
+    if (phase.is2nd() && !fallbackEnabled) {
       minBatchesNeeded *= 2;  // 2nd phase (w/o fallback) needs at least 2 partitions
     }
-    if ( configuredBatchSize > memAvail / minBatchesNeeded ) { // no cast - memAvail may be bigger than max-int
+    if (configuredBatchSize > memAvail / minBatchesNeeded) { // no cast - memAvail may be bigger than max-int
       int reducedBatchSize = (int)(memAvail / minBatchesNeeded);
       logger.trace("Reducing configured batch size from: {} to: {}, due to Mem limit: {}",
         configuredBatchSize, reducedBatchSize, memAvail);
@@ -250,9 +250,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
         state = BatchState.DONE;
         container.buildSchema(SelectionVectorMode.NONE);
         return;
-      case STOP:
-        state = BatchState.STOP;
-        return;
       default:
         break;
     }
@@ -280,11 +277,11 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
         // or: 1st phase need to return (not fully grouped) partial output due to memory pressure
         aggregator.earlyOutput() ||
         // or: while handling an EMIT - returning output for that section
-        aggregator.handlingEmit() ) {
+        aggregator.handlingEmit()) {
       // then output the next batch downstream
       HashAggregator.AggIterOutcome aggOut = aggregator.outputCurrentBatch();
       // if Batch returned, or end of data, or Emit - then return the appropriate iter outcome
-      switch ( aggOut ) {
+      switch (aggOut) {
         case AGG_NONE:
           return IterOutcome.NONE;
         case AGG_OK:
@@ -297,62 +294,64 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       }
     }
 
-    if (wasKilled) { // if kill() was called before, then finish up
+    if (wasKilled) { // if cancel() was called before, then finish up
       aggregator.cleanup();
-      incoming.kill(false);
       return IterOutcome.NONE;
     }
 
     // Read and aggregate records
-    // ( may need to run again if the spilled partition that was read
-    //   generated new partitions that were all spilled )
+    // (may need to run again if the spilled partition that was read
+    //  generated new partitions that were all spilled)
     AggOutcome out;
     do {
-      //
       //  Read incoming batches and process their records
-      //
       out = aggregator.doWork();
     } while (out == AggOutcome.CALL_WORK_AGAIN);
 
     switch (out) {
-    case CLEANUP_AND_RETURN:
-      container.zeroVectors();
-      aggregator.cleanup();
-      state = BatchState.DONE;
-      // fall through
-    case RETURN_OUTCOME:
-      // rebuilds the schema in the case of complex writer expressions,
-      // since vectors would be added to batch run-time
-      IterOutcome outcome = aggregator.getOutcome();
-      switch (outcome) {
-        case OK:
-        case OK_NEW_SCHEMA:
-          if (firstBatch) {
-            if (CollectionUtils.isNotEmpty(complexWriters)) {
-              container.buildSchema(SelectionVectorMode.NONE);
-              outcome = IterOutcome.OK_NEW_SCHEMA;
+      case CLEANUP_AND_RETURN:
+        container.zeroVectors();
+        aggregator.cleanup();
+        state = BatchState.DONE;
+        // fall through
+      case RETURN_OUTCOME:
+        // rebuilds the schema in the case of complex writer expressions,
+        // since vectors would be added to batch run-time
+        IterOutcome outcome = aggregator.getOutcome();
+        switch (outcome) {
+          case OK:
+          case OK_NEW_SCHEMA:
+            if (firstBatch) {
+              if (CollectionUtils.isNotEmpty(complexWriters)) {
+                container.buildSchema(SelectionVectorMode.NONE);
+                // You'd be forgiven for thinking we should always return
+                // OK_NEW_SCHEMA for the first batch. It turns out, when
+                // two hash aggs are stacked, we get an error if the
+                // upstream one returns OK_NEW_SCHEMA first. Not sure the
+                // details, only know several tests fail.
+                outcome = IterOutcome.OK_NEW_SCHEMA;
+              }
+              firstBatch = false;
             }
-            firstBatch = false;
-          }
-          // fall thru
-        default:
-          return outcome;
-      }
-
-    case UPDATE_AGGREGATOR:
-      throw UserException.unsupportedError()
-          .message(SchemaChangeException.schemaChanged(
-              "Hash aggregate does not support schema change",
-              incomingSchema,
-              incoming.getSchema()).getMessage())
-          .build(logger);
-    default:
-      throw new IllegalStateException(String.format("Unknown state %s.", out));
+            break;
+          default:
+        }
+        return outcome;
+
+      case UPDATE_AGGREGATOR:
+        throw UserException.unsupportedError()
+            .message(SchemaChangeException.schemaChanged(
+                "Hash aggregate does not support schema change",
+                incomingSchema,
+                incoming.getSchema()).getMessage())
+            .build(logger);
+      default:
+        throw new IllegalStateException(String.format("Unknown state %s.", out));
     }
   }
 
   /**
-   * Creates a new Aggregator based on the current schema. If setup fails, this
+   * Creates a new aggregator based on the current schema. If setup fails, this
    * method is responsible for cleaning up and informing the context of the
    * failure state, as well is informing the upstream operators.
    *
@@ -503,22 +502,22 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
 
   private void setupGetIndex(ClassGenerator<HashAggregator> cg) {
     switch (incoming.getSchema().getSelectionVectorMode()) {
-    case FOUR_BYTE: {
-      JVar var = cg.declareClassField("sv4_", cg.getModel()._ref(SelectionVector4.class));
-      cg.getBlock("doSetup").assign(var, JExpr.direct("incoming").invoke("getSelectionVector4"));
-      cg.getBlock("getVectorIndex")._return(var.invoke("get").arg(JExpr.direct("recordIndex")));
-      return;
-    }
-    case NONE: {
-      cg.getBlock("getVectorIndex")._return(JExpr.direct("recordIndex"));
-      return;
-    }
-    case TWO_BYTE: {
-      JVar var = cg.declareClassField("sv2_", cg.getModel()._ref(SelectionVector2.class));
-      cg.getBlock("doSetup").assign(var, JExpr.direct("incoming").invoke("getSelectionVector2"));
-      cg.getBlock("getVectorIndex")._return(var.invoke("getIndex").arg(JExpr.direct("recordIndex")));
-      return;
-    }
+      case FOUR_BYTE: {
+        JVar var = cg.declareClassField("sv4_", cg.getModel()._ref(SelectionVector4.class));
+        cg.getBlock("doSetup").assign(var, JExpr.direct("incoming").invoke("getSelectionVector4"));
+        cg.getBlock("getVectorIndex")._return(var.invoke("get").arg(JExpr.direct("recordIndex")));
+        return;
+      }
+      case NONE: {
+        cg.getBlock("getVectorIndex")._return(JExpr.direct("recordIndex"));
+        return;
+      }
+      case TWO_BYTE: {
+        JVar var = cg.declareClassField("sv2_", cg.getModel()._ref(SelectionVector2.class));
+        cg.getBlock("doSetup").assign(var, JExpr.direct("incoming").invoke("getSelectionVector2"));
+        cg.getBlock("getVectorIndex")._return(var.invoke("getIndex").arg(JExpr.direct("recordIndex")));
+        return;
+      }
     }
   }
 
@@ -552,16 +551,16 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
+  protected void cancelIncoming() {
     wasKilled = true;
-    incoming.kill(sendUpstream);
+    incoming.cancel();
   }
 
   @Override
   public void dump() {
     logger.error("HashAggBatch[container={}, aggregator={}, groupByOutFieldIds={}, aggrOutFieldIds={}, " +
-            "incomingSchema={}, wasKilled={}, numGroupByExprs={}, numAggrExprs={}, popConfig={}]",
+            "incomingSchema={}, numGroupByExprs={}, numAggrExprs={}, popConfig={}]",
         container, aggregator, Arrays.toString(groupByOutFieldIds), Arrays.toString(aggrOutFieldIds), incomingSchema,
-        wasKilled, numGroupByExprs, numAggrExprs, popConfig);
+        numGroupByExprs, numAggrExprs, popConfig);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 3f13a5a..ba94faa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -67,6 +67,7 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
@@ -109,7 +110,7 @@ public abstract class HashAggTemplate implements HashAggregator {
   private boolean useMemoryPrediction; // whether to use memory prediction to decide when to spill
   private long estMaxBatchSize; // used for adjusting #partitions and deciding when to spill
   private long estRowWidth; // the size of the internal "row" (keys + values + extra columns)
-  private long estValuesRowWidth; // the size of the internal values ( values + extra )
+  private long estValuesRowWidth; // the size of the internal values (values + extra)
   private long estOutputRowWidth; // the size of the output "row" (no extra columns)
   private long estValuesBatchSize; // used for "reserving" memory for the Values batch to overcome an OOM
   private long estOutgoingAllocSize; // used for "reserving" memory for the Outgoing Output Values to overcome an OOM
@@ -201,7 +202,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
 
     public void setTargetBatchRowCount(int batchRowCount) {
-      this.targetBatchRowCount = batchRowCount;
+      targetBatchRowCount = batchRowCount;
     }
 
     public int getCurrentRowCount() {
@@ -212,7 +213,7 @@ public abstract class HashAggTemplate implements HashAggregator {
 
       aggrValuesContainer = new VectorContainer();
       boolean success = false;
-      this.targetBatchRowCount = batchRowCount;
+      targetBatchRowCount = batchRowCount;
 
       try {
         ValueVector vector;
@@ -350,7 +351,7 @@ public abstract class HashAggTemplate implements HashAggregator {
           "expressions.");
     }
 
-    this.htIdxHolder = new IndexPointer();
+    htIdxHolder = new IndexPointer();
     materializedValueFields = new MaterializedField[valueFieldIds.size()];
 
     if (valueFieldIds.size() > 0) {
@@ -368,7 +369,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill)
     numGroupByOutFields = groupByOutFieldIds.length;
 
-    // Start calculating the row widths (with the extra columns; the rest would be done in updateEstMaxBatchSize() )
+    // Start calculating the row widths (with the extra columns; the rest would be done in updateEstMaxBatchSize())
     estRowWidth = extraRowBytes;
     estValuesRowWidth = extraRowBytes;
 
@@ -390,13 +391,13 @@ public abstract class HashAggTemplate implements HashAggregator {
 
     // Set the number of partitions from the configuration (raise to a power of two, if needed)
     int numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
-    if ( numPartitions == 1 && phase.is2nd() ) { // 1st phase can still do early return with 1 partition
+    if (numPartitions == 1 && phase.is2nd()) { // 1st phase can still do early return with 1 partition
       canSpill = false;
       logger.warn("Spilling is disabled due to configuration setting of num_partitions to 1");
     }
     numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2
 
-    if ( schema == null ) { estValuesBatchSize = estOutgoingAllocSize = estMaxBatchSize = 0; } // incoming was an empty batch
+    if (schema == null) { estValuesBatchSize = estOutgoingAllocSize = estMaxBatchSize = 0; } // incoming was an empty batch
     else {
       // Estimate the max batch size; should use actual data (e.g. lengths of varchars)
       updateEstMaxBatchSize(incoming);
@@ -405,17 +406,17 @@ public abstract class HashAggTemplate implements HashAggregator {
     reserveValueBatchMemory = reserveOutgoingMemory = estValuesBatchSize;
     long newMemoryLimit = allocator.getLimit() - reserveValueBatchMemory - reserveOutgoingMemory;
     long memAvail = newMemoryLimit - allocator.getAllocatedMemory();
-    if ( memAvail <= 0 ) { throw new OutOfMemoryException("Too little memory available"); }
+    if (memAvail <= 0) { throw new OutOfMemoryException("Too little memory available"); }
     allocator.setLimit(newMemoryLimit);
 
-    if ( !canSpill ) { // single phase, or spill disabled by configuation
+    if (!canSpill) { // single phase, or spill disabled by configuation
       numPartitions = 1; // single phase should use only a single partition (to save memory)
     } else { // two phase
       // Adjust down the number of partitions if needed - when the memory available can not hold as
       // many batches (configurable option), plus overhead (e.g. hash table, links, hash values))
-      while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 2 * 1024 * 1024) > memAvail ) {
+      while (numPartitions * (estMaxBatchSize * minBatchesPerPartition + 2 * 1024 * 1024) > memAvail) {
         numPartitions /= 2;
-        if ( numPartitions < 2) {
+        if (numPartitions < 2) {
           if (phase.is2nd()) {
             canSpill = false;  // 2nd phase needs at least 2 to make progress
 
@@ -440,7 +441,7 @@ public abstract class HashAggTemplate implements HashAggregator {
 
     // The following initial safety check should be revisited once we can lower the number of rows in a batch
     // In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table)
-    if ( numPartitions == 1 && ! canSpill ) {
+    if (numPartitions == 1 && !canSpill) {
       // if too little memory - behave like the old code -- practically no memory limit for hash aggregate
       // (but 1st phase can still spill, so it will maintain the original memory limit)
       allocator.setLimit(AbstractBase.MAX_ALLOCATION);  // 10_000_000_000L
@@ -459,9 +460,9 @@ public abstract class HashAggTemplate implements HashAggregator {
     plannedBatches = numPartitions; // each partition should allocate its first batch
 
     // initialize every (per partition) entry in the arrays
-    for (int i = 0; i < numPartitions; i++ ) {
+    for (int i = 0; i < numPartitions; i++) {
       try {
-        this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds);
+        htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds);
       } catch (ClassTransformationException e) {
         throw UserException.unsupportedError(e)
             .message("Code generation error - likely an error in the code.")
@@ -473,7 +474,7 @@ public abstract class HashAggTemplate implements HashAggregator {
       } catch (SchemaChangeException sce) {
         throw new IllegalStateException("Unexpected Schema Change while creating a hash table",sce);
       }
-      this.batchHolders[i] = new ArrayList<BatchHolder>(); // First BatchHolder is created when the first put request is received.
+      batchHolders[i] = new ArrayList<BatchHolder>(); // First BatchHolder is created when the first put request is received.
     }
     // Initialize the value vectors in the generated code (which point to the incoming or outgoing fields)
     try {
@@ -491,13 +492,13 @@ public abstract class HashAggTemplate implements HashAggregator {
 
   private void initializeSetup(RecordBatch newIncoming) throws SchemaChangeException, IOException {
     baseHashTable.updateIncoming(newIncoming, null); // after a spill - a new incoming
-    this.incoming = newIncoming;
+    incoming = newIncoming;
     currentBatchRecordCount = newIncoming.getRecordCount(); // first batch in this spill file
     nextPartitionToReturn = 0;
-    for (int i = 0; i < spilledState.getNumPartitions(); i++ ) {
+    for (int i = 0; i < spilledState.getNumPartitions(); i++) {
       htables[i].updateIncoming(newIncoming.getContainer(), null);
       htables[i].reset();
-      if ( batchHolders[i] != null) {
+      if (batchHolders[i] != null) {
         for (BatchHolder bh : batchHolders[i]) {
           bh.clear();
         }
@@ -517,7 +518,7 @@ public abstract class HashAggTemplate implements HashAggregator {
    * @param incoming
    */
   private void updateEstMaxBatchSize(RecordBatch incoming) {
-    if ( estMaxBatchSize > 0 ) { return; }  // no handling of a schema (or varchar) change
+    if (estMaxBatchSize > 0) { return; }  // no handling of a schema (or varchar) change
     // Use the sizer to get the input row width and the length of the longest varchar column
     RecordBatchSizer sizer = outgoing.getRecordBatchMemoryManager().getRecordBatchSizer();
     logger.trace("Incoming sizer: {}",sizer);
@@ -543,7 +544,7 @@ public abstract class HashAggTemplate implements HashAggregator {
           TypeHelper.getSize(mr.getType());
       estRowWidth += fieldSize;
       estOutputRowWidth += fieldSize;
-      if ( fieldId < numGroupByOutFields ) { fieldId++; }
+      if (fieldId < numGroupByOutFields) { fieldId++; }
       else { estValuesRowWidth += fieldSize; }
     }
     // multiply by the max number of rows in a batch to get the final estimated max size
@@ -561,15 +562,16 @@ public abstract class HashAggTemplate implements HashAggregator {
     logger.trace("{} phase. Estimated internal row width: {} Values row width: {} batch size: {}  memory limit: {}  max column width: {}",
       phase.getName(),estRowWidth,estValuesRowWidth,estMaxBatchSize,allocator.getLimit(),maxColumnWidth);
 
-    if ( estMaxBatchSize > allocator.getLimit() ) {
+    if (estMaxBatchSize > allocator.getLimit()) {
       logger.warn("HashAggregate: Estimated max batch size {} is larger than the memory limit {}",estMaxBatchSize,allocator.getLimit());
     }
   }
 
   /**
-   *  Read and process (i.e., insert into the hash table and aggregate) records from the current batch.
-   *  Once complete, get the incoming NEXT batch and process it as well, etc.
-   *  For 1st phase, may return when an early output needs to be performed.
+   * Read and process (i.e., insert into the hash table and aggregate) records
+   * from the current batch. Once complete, get the incoming NEXT batch and
+   * process it as well, etc. For 1st phase, may return when an early output
+   * needs to be performed.
    *
    * @return Agg outcome status
    */
@@ -578,9 +580,9 @@ public abstract class HashAggTemplate implements HashAggregator {
 
     while (true) {
 
-      // This would be called only once - first time actual data arrives on incoming
-      if ( schema == null && incoming.getRecordCount() > 0 ) {
-        this.schema = incoming.getSchema();
+      // This is called only once - first time actual data arrives on incoming
+      if (schema == null && incoming.getRecordCount() > 0) {
+        schema = incoming.getSchema();
         currentBatchRecordCount = incoming.getRecordCount(); // initialize for first non empty batch
         // Calculate the number of partitions based on actual incoming data
         delayedSetup();
@@ -591,9 +593,7 @@ public abstract class HashAggTemplate implements HashAggregator {
         outgoing.getRecordBatchMemoryManager().update(incoming);
       }
 
-      //
       //  loop through existing records in this batch, aggregating the values as necessary.
-      //
       if (EXTRA_DEBUG_1) {
         logger.debug("Starting outer loop of doWork()...");
       }
@@ -603,12 +603,12 @@ public abstract class HashAggTemplate implements HashAggregator {
         }
         checkGroupAndAggrValues(currentIndex);
 
-        if ( retrySameIndex ) { retrySameIndex = false; }  // need to retry this row (e.g. we had an OOM)
+        if (retrySameIndex) {retrySameIndex = false; }  // need to retry this row (e.g. we had an OOM)
         else { incIndex(); } // next time continue with the next incoming row
 
         // If adding a group discovered a memory pressure during 1st phase, then start
         // outputing some partition downstream in order to free memory.
-        if ( earlyOutput ) {
+        if (earlyOutput) {
           outputCurrentBatch();
           return AggOutcome.RETURN_OUTCOME;
         }
@@ -619,14 +619,11 @@ public abstract class HashAggTemplate implements HashAggregator {
       }
 
       // Cleanup the previous batch since we are done processing it.
-      for (VectorWrapper<?> v : incoming) {
-        v.getValueVector().clear();
-      }
+      VectorAccessibleUtilities.clear(incoming);
 
-      if ( handleEmit ) {
+      if (handleEmit) {
         outcome = IterOutcome.NONE; // finished behaving like OK, now behave like NONE
-      }
-      else {
+      } else {
         //
         // Get the NEXT input batch, initially from the upstream, later (if there was a spill)
         // from one of the spill files (The spill case is handled differently here to avoid
@@ -661,7 +658,7 @@ public abstract class HashAggTemplate implements HashAggregator {
           if (EXTRA_DEBUG_1) {
             logger.debug("Received new schema.  Batch has {} records.", incoming.getRecordCount());
           }
-          this.cleanup();
+          cleanup();
           // TODO: new schema case needs to be handled appropriately
           return AggOutcome.UPDATE_AGGREGATOR;
 
@@ -690,7 +687,7 @@ public abstract class HashAggTemplate implements HashAggregator {
           // the outgoing batch downstream (see innerNext() in HashAggBatch).
           buildComplete = true; // now should go and return outgoing
 
-          if ( handleEmit ) {
+          if (handleEmit) {
             buildComplete = false; // This was not a real NONE - more incoming is expected
             // don't aggregate this incoming batch again (in the loop above; when doWork() is called again)
             currentBatchRecordCount = 0;
@@ -701,7 +698,7 @@ public abstract class HashAggTemplate implements HashAggregator {
           // in response to each next() call by a downstream operator
           AggIterOutcome aggOutcome = outputCurrentBatch();
 
-          switch ( aggOutcome ) {
+          switch (aggOutcome) {
             case AGG_RESTART:
               // Output of first batch returned a RESTART (all new partitions were spilled)
               return AggOutcome.CALL_WORK_AGAIN; // need to read/process the next partition
@@ -730,7 +727,7 @@ public abstract class HashAggTemplate implements HashAggregator {
   private void useReservedValuesMemory() {
     // try to preempt an OOM by using the reserved memory
     long reservedMemory = reserveValueBatchMemory;
-    if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + reservedMemory); }
+    if (reservedMemory > 0) { allocator.setLimit(allocator.getLimit() + reservedMemory); }
 
     reserveValueBatchMemory = 0;
   }
@@ -740,7 +737,7 @@ public abstract class HashAggTemplate implements HashAggregator {
   private void useReservedOutgoingMemory() {
     // try to preempt an OOM by using the reserved memory
     long reservedMemory = reserveOutgoingMemory;
-    if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + reservedMemory); }
+    if (reservedMemory > 0) { allocator.setLimit(allocator.getLimit() + reservedMemory); }
 
     reserveOutgoingMemory = 0;
   }
@@ -749,16 +746,16 @@ public abstract class HashAggTemplate implements HashAggregator {
    *
    */
   private void restoreReservedMemory() {
-    if ( 0 == reserveOutgoingMemory ) { // always restore OutputValues first (needed for spilling)
+    if (0 == reserveOutgoingMemory) { // always restore OutputValues first (needed for spilling)
       long memAvail = allocator.getLimit() - allocator.getAllocatedMemory();
-      if ( memAvail > estOutgoingAllocSize) {
+      if (memAvail > estOutgoingAllocSize) {
         allocator.setLimit(allocator.getLimit() - estOutgoingAllocSize);
         reserveOutgoingMemory = estOutgoingAllocSize;
       }
     }
-    if ( 0 == reserveValueBatchMemory ) {
+    if (0 == reserveValueBatchMemory) {
       long memAvail = allocator.getLimit() - allocator.getAllocatedMemory();
-      if ( memAvail > estValuesBatchSize) {
+      if (memAvail > estValuesBatchSize) {
         allocator.setLimit(allocator.getLimit() - estValuesBatchSize);
         reserveValueBatchMemory = estValuesBatchSize;
       }
@@ -816,7 +813,7 @@ public abstract class HashAggTemplate implements HashAggregator {
 
   @Override
   public void adjustOutputCount(int outputBatchSize, int oldRowWidth, int newRowWidth) {
-    for (int i = 0; i < spilledState.getNumPartitions(); i++ ) {
+    for (int i = 0; i < spilledState.getNumPartitions(); i++) {
       if (batchHolders[i] == null || batchHolders[i].size() == 0) {
         continue;
       }
@@ -832,18 +829,18 @@ public abstract class HashAggTemplate implements HashAggregator {
 
   @Override
   public void cleanup() {
-    if ( schema == null ) { return; } // not set up; nothing to clean
-    if ( phase.is2nd() && spillSet.getWriteBytes() > 0 ) {
+    if (schema == null) { return; } // not set up; nothing to clean
+    if (phase.is2nd() && spillSet.getWriteBytes() > 0) {
       stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
           (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
     }
     // clean (and deallocate) each partition
-    for ( int i = 0; i < spilledState.getNumPartitions(); i++) {
+    for (int i = 0; i < spilledState.getNumPartitions(); i++) {
           if (htables[i] != null) {
               htables[i].clear();
               htables[i] = null;
           }
-          if ( batchHolders[i] != null) {
+          if (batchHolders[i] != null) {
               for (BatchHolder bh : batchHolders[i]) {
                     bh.clear();
               }
@@ -852,7 +849,7 @@ public abstract class HashAggTemplate implements HashAggregator {
           }
 
           // delete any (still active) output spill file
-          if ( writers[i] != null && spillFiles[i] != null) {
+          if (writers[i] != null && spillFiles[i] != null) {
             try {
               spillSet.close(writers[i]);
               writers[i] = null;
@@ -873,7 +870,7 @@ public abstract class HashAggTemplate implements HashAggregator {
         }
     }
     // Delete the currently handled (if any) spilled file
-    if ( newIncoming != null ) { newIncoming.close();  }
+    if (newIncoming != null) { newIncoming.close();  }
     spillSet.close(); // delete the spill directory(ies)
     htIdxHolder = null;
     materializedValueFields = null;
@@ -884,7 +881,7 @@ public abstract class HashAggTemplate implements HashAggregator {
   private void reinitPartition(int part) /* throws SchemaChangeException /*, IOException */ {
     assert htables[part] != null;
     htables[part].reset();
-    if ( batchHolders[part] != null) {
+    if (batchHolders[part] != null) {
       for (BatchHolder bh : batchHolders[part]) {
         bh.clear();
       }
@@ -929,20 +926,20 @@ public abstract class HashAggTemplate implements HashAggregator {
    * @return The partition (number) chosen to be spilled
    */
   private int chooseAPartitionToFlush(int currPart, boolean tryAvoidCurr) {
-    if ( phase.is1st() && ! tryAvoidCurr) { return currPart; } // 1st phase: just use the current partition
+    if (phase.is1st() && !tryAvoidCurr) { return currPart; } // 1st phase: just use the current partition
     int currPartSize = batchHolders[currPart].size();
-    if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current if size is 1
+    if (currPartSize == 1) { currPartSize = -1; } // don't pick current if size is 1
     // first find the largest spilled partition
     int maxSizeSpilled = -1;
     int indexMaxSpilled = -1;
-    for (int isp = 0; isp < spilledState.getNumPartitions(); isp++ ) {
-      if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) {
+    for (int isp = 0; isp < spilledState.getNumPartitions(); isp++) {
+      if (isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size()) {
         maxSizeSpilled = batchHolders[isp].size();
         indexMaxSpilled = isp;
       }
     }
     // Give the current (if already spilled) some priority
-    if ( ! tryAvoidCurr && isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) {
+    if (!tryAvoidCurr && isSpilled(currPart) && (currPartSize + 1 >= maxSizeSpilled)) {
       maxSizeSpilled = currPartSize;
       indexMaxSpilled = currPart;
     }
@@ -950,21 +947,21 @@ public abstract class HashAggTemplate implements HashAggregator {
     int maxSize = -1;
     int indexMax = -1;
     // Use the largest spilled (if found) as a base line, with a factor of 4
-    if ( indexMaxSpilled > -1 && maxSizeSpilled > 1 ) {
+    if (indexMaxSpilled > -1 && maxSizeSpilled > 1) {
       indexMax = indexMaxSpilled;
       maxSize = 4 * maxSizeSpilled;
     }
-    for ( int insp = 0; insp < spilledState.getNumPartitions(); insp++) {
-      if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) {
+    for (int insp = 0; insp < spilledState.getNumPartitions(); insp++) {
+      if (!isSpilled(insp) && maxSize < batchHolders[insp].size()) {
         indexMax = insp;
         maxSize = batchHolders[insp].size();
       }
     }
     // again - priority to the current partition
-    if ( ! tryAvoidCurr && ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) {
+    if (!tryAvoidCurr && !isSpilled(currPart) && (currPartSize + 1 >= maxSize)) {
       return currPart;
     }
-    if ( maxSize <= 1 ) { // Can not make progress by spilling a single batch!
+    if (maxSize <= 1) { // Can not make progress by spilling a single batch!
       return -1; // try skipping this spill
     }
     return indexMax;
@@ -979,14 +976,14 @@ public abstract class HashAggTemplate implements HashAggregator {
 
     ArrayList<BatchHolder> currPartition = batchHolders[part];
     rowsInPartition = 0;
-    if ( EXTRA_DEBUG_SPILL ) {
+    if (EXTRA_DEBUG_SPILL) {
       logger.debug("HashAggregate: Spilling partition {} current cycle {} part size {}", part, spilledState.getCycle(), currPartition.size());
     }
 
-    if ( currPartition.size() == 0 ) { return; } // in case empty - nothing to spill
+    if (currPartition.size() == 0) { return; } // in case empty - nothing to spill
 
     // If this is the first spill for this partition, create an output stream
-    if ( ! isSpilled(part) ) {
+    if (!isSpilled(part)) {
 
       spillFiles[part] = spillSet.getNextSpillFile(spilledState.getCycle() > 0 ? Integer.toString(spilledState.getCycle()) : null);
 
@@ -999,7 +996,7 @@ public abstract class HashAggTemplate implements HashAggregator {
       }
     }
 
-    for (int currOutBatchIndex = 0; currOutBatchIndex < currPartition.size(); currOutBatchIndex++ ) {
+    for (int currOutBatchIndex = 0; currOutBatchIndex < currPartition.size(); currOutBatchIndex++) {
 
       // get the number of records in the batch holder that are pending output
       int numOutputRecords = currPartition.get(currOutBatchIndex).getNumPendingOutput();
@@ -1009,7 +1006,7 @@ public abstract class HashAggTemplate implements HashAggregator {
 
       allocateOutgoing(numOutputRecords);
       currPartition.get(currOutBatchIndex).outputValues();
-      this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, numOutputRecords);
+      htables[part].outputKeys(currOutBatchIndex, outContainer, numOutputRecords);
 
       // set the value count for outgoing batch value vectors
       outContainer.setValueCount(numOutputRecords);
@@ -1045,7 +1042,7 @@ public abstract class HashAggTemplate implements HashAggregator {
 
   // These methods are overridden in the generated class when created as plain Java code.
   protected BatchHolder newBatchHolder(int batchRowCount) {
-    return this.injectMembers(new BatchHolder(batchRowCount));
+    return injectMembers(new BatchHolder(batchRowCount));
   }
 
   protected BatchHolder injectMembers(BatchHolder batchHolder) {
@@ -1063,24 +1060,24 @@ public abstract class HashAggTemplate implements HashAggregator {
   public AggIterOutcome outputCurrentBatch() {
 
     // Handle the case of an EMIT with an empty batch
-    if ( handleEmit && ( batchHolders == null || batchHolders[0].size() == 0 ) ) {
+    if (handleEmit && (batchHolders == null || batchHolders[0].size() == 0)) {
       lastBatchOutputCount = 0; // empty
       allocateOutgoing(0);
       outgoing.getContainer().setValueCount(0);
 
       // When returning the last outgoing batch (following an incoming EMIT), then replace OK with EMIT
-      this.outcome = IterOutcome.EMIT;
+      outcome = IterOutcome.EMIT;
       handleEmit = false; // finish handling EMIT
-      if ( outBatchIndex != null ) {
+      if (outBatchIndex != null) {
         outBatchIndex[0] = 0; // reset, for the next EMIT
       }
       return AggIterOutcome.AGG_EMIT;
     }
 
     // when incoming was an empty batch, just finish up
-    if ( schema == null ) {
+    if (schema == null) {
       logger.trace("Incoming was empty; output is an empty batch.");
-      this.outcome = IterOutcome.NONE; // no records were read
+      outcome = IterOutcome.NONE; // no records were read
       allFlushed = true;
       return AggIterOutcome.AGG_NONE;
     }
@@ -1090,14 +1087,14 @@ public abstract class HashAggTemplate implements HashAggregator {
     int currOutBatchIndex = outBatchIndex[earlyPartition];
     int partitionToReturn = earlyPartition;
 
-    if ( ! earlyOutput ) {
+    if (!earlyOutput) {
       // Update the next partition to return (if needed)
       // skip fully returned (or spilled) partitions
       while (nextPartitionToReturn < spilledState.getNumPartitions()) {
         //
         // If this partition was spilled - spill the rest of it and skip it
         //
-        if ( isSpilled(nextPartitionToReturn) ) {
+        if (isSpilled(nextPartitionToReturn)) {
           spillAPartition(nextPartitionToReturn); // spill the rest
           HashAggSpilledPartition sp = new HashAggSpilledPartition(
             spilledState.getCycle(),
@@ -1134,8 +1131,8 @@ public abstract class HashAggTemplate implements HashAggregator {
         // The following "if" is probably never used; due to a similar check at the end of this method
         if (spilledState.isEmpty()) { // and no spilled partitions
           allFlushed = true;
-          this.outcome = IterOutcome.NONE;
-          if ( phase.is2nd() && spillSet.getWriteBytes() > 0 ) {
+          outcome = IterOutcome.NONE;
+          if (phase.is2nd() && spillSet.getWriteBytes() > 0) {
             stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
                 (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
           }
@@ -1162,7 +1159,6 @@ public abstract class HashAggTemplate implements HashAggregator {
       }
 
       partitionToReturn = nextPartitionToReturn;
-
     }
 
     // get the number of records in the batch holder that are pending output
@@ -1170,15 +1166,15 @@ public abstract class HashAggTemplate implements HashAggregator {
 
     // The following accounting is for logging, metrics, etc.
     rowsInPartition += numPendingOutput;
-    if ( ! handlingSpills ) { rowsNotSpilled += numPendingOutput; }
+    if (!handlingSpills) { rowsNotSpilled += numPendingOutput; }
     else { rowsSpilledReturned += numPendingOutput; }
-    if ( earlyOutput ) { rowsReturnedEarly += numPendingOutput; }
+    if (earlyOutput) { rowsReturnedEarly += numPendingOutput; }
 
     allocateOutgoing(numPendingOutput);
 
     currPartition.get(currOutBatchIndex).outputValues();
     int numOutputRecords = numPendingOutput;
-    this.htables[partitionToReturn].outputKeys(currOutBatchIndex, this.outContainer, numPendingOutput);
+    htables[partitionToReturn].outputKeys(currOutBatchIndex, outContainer, numPendingOutput);
 
     // set the value count for outgoing batch value vectors
     outgoing.getContainer().setValueCount(numOutputRecords);
@@ -1186,9 +1182,9 @@ public abstract class HashAggTemplate implements HashAggregator {
     outgoing.getRecordBatchMemoryManager().updateOutgoingStats(numOutputRecords);
     RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, outgoing, outgoing.getRecordBatchStatsContext());
 
-    this.outcome = IterOutcome.OK;
+    outcome = IterOutcome.OK;
 
-    if ( EXTRA_DEBUG_SPILL && phase.is2nd() ) {
+    if (EXTRA_DEBUG_SPILL && phase.is2nd()) {
       logger.debug("So far returned {} + SpilledReturned {}  total {} (spilled {})",rowsNotSpilled,rowsSpilledReturned,
         rowsNotSpilled+rowsSpilledReturned,
         rowsSpilled);
@@ -1199,7 +1195,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     // if just flushed the last batch in the partition
     if (outBatchIndex[partitionToReturn] == currPartition.size()) {
 
-      if ( EXTRA_DEBUG_SPILL ) {
+      if (EXTRA_DEBUG_SPILL) {
         logger.debug("HashAggregate: {} Flushed partition {} with {} batches total {} rows",
             earlyOutput ? "(Early)" : "",
             partitionToReturn, outBatchIndex[partitionToReturn], rowsInPartition);
@@ -1209,17 +1205,17 @@ public abstract class HashAggTemplate implements HashAggregator {
       // deallocate memory used by this partition, and re-initialize
       reinitPartition(partitionToReturn);
 
-      if ( earlyOutput ) {
+      if (earlyOutput) {
 
-        if ( EXTRA_DEBUG_SPILL ) {
+        if (EXTRA_DEBUG_SPILL) {
           logger.debug("HASH AGG: Finished (early) re-init partition {}, mem allocated: {}", earlyPartition, allocator.getAllocatedMemory());
         }
         outBatchIndex[earlyPartition] = 0; // reset, for next time
         earlyOutput = false; // done with early output
       }
-      else if ( handleEmit ) {
+      else if (handleEmit) {
         // When returning the last outgoing batch (following an incoming EMIT), then replace OK with EMIT
-        this.outcome = IterOutcome.EMIT;
+        outcome = IterOutcome.EMIT;
         handleEmit = false; // finished handling EMIT
         outBatchIndex[partitionToReturn] = 0; // reset, for the next EMIT
         return AggIterOutcome.AGG_EMIT;
@@ -1231,7 +1227,7 @@ public abstract class HashAggTemplate implements HashAggregator {
         logger.trace("HashAggregate: All batches flushed.");
 
         // cleanup my internal state since there is nothing more to return
-        this.cleanup();
+        cleanup();
       }
     }
 
@@ -1274,8 +1270,8 @@ public abstract class HashAggTemplate implements HashAggregator {
     } else { // a bug ?
       errmsg = prefix + " OOM at " + phase.getName() + " Phase. Partitions: " + spilledState.getNumPartitions() +
       ". Estimated batch size: " + estMaxBatchSize + ". values size: " + estValuesBatchSize + ". Output alloc size: " + estOutgoingAllocSize;
-      if ( plannedBatches > 0 ) { errmsg += ". Planned batches: " + plannedBatches; }
-      if ( rowsSpilled > 0 ) { errmsg += ". Rows spilled so far: " + rowsSpilled; }
+      if (plannedBatches > 0) { errmsg += ". Planned batches: " + plannedBatches; }
+      if (rowsSpilled > 0) { errmsg += ". Rows spilled so far: " + rowsSpilled; }
     }
     errmsg += " Memory limit: " + allocator.getLimit() + " so far allocated: " + allocator.getAllocatedMemory() + ". ";
 
@@ -1291,7 +1287,7 @@ public abstract class HashAggTemplate implements HashAggregator {
   // index is also used for the aggregation values maintained by the hash aggregate.
   private void checkGroupAndAggrValues(int incomingRowIdx) {
     assert incomingRowIdx >= 0;
-    assert ! earlyOutput;
+    assert !earlyOutput;
 
     // The hash code is computed once, then its lower bits are used to determine the
     // partition to use, and the higher bits determine the location in the hash table.
@@ -1315,7 +1311,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     String tryingTo = phase.is1st() ? "early return" : "spill";
 
     // Proactive spill - in case there is no reserve memory - spill and retry putting later
-    if ( reserveValueBatchMemory == 0 && canSpill ) {
+    if (reserveValueBatchMemory == 0 && canSpill) {
       logger.trace("Reserved memory runs short, trying to {} a partition and retry Hash Table put() again.", tryingTo);
 
       doSpill(currentPartition); // spill to free some memory
@@ -1332,13 +1328,13 @@ public abstract class HashAggTemplate implements HashAggregator {
       putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode, getTargetBatchCount());
 
     } catch (RetryAfterSpillException re) {
-      if ( ! canSpill ) { throw new OutOfMemoryException(getOOMErrorMsg("Can not spill")); }
+      if (!canSpill) { throw new OutOfMemoryException(getOOMErrorMsg("Can not spill")); }
 
       logger.trace("HT put failed with an OOM, trying to {} a partition and retry Hash Table put() again.", tryingTo);
 
       // for debugging - in case there's a leak
       long memDiff = allocator.getAllocatedMemory() - allocatedBeforeHTput;
-      if ( memDiff > 0 ) { logger.warn("Leak: HashTable put() OOM left behind {} bytes allocated",memDiff); }
+      if (memDiff > 0) { logger.warn("Leak: HashTable put() OOM left behind {} bytes allocated",memDiff); }
 
       doSpill(currentPartition); // spill to free some memory
 
@@ -1357,7 +1353,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     //       In case put() added a new batch (for the keys) inside the hash table,
     //       then a matching batch (for the aggregate columns) needs to be created
     //
-    if ( putStatus == HashTable.PutStatus.NEW_BATCH_ADDED ) {
+    if (putStatus == HashTable.PutStatus.NEW_BATCH_ADDED) {
       try {
 
         useReservedValuesMemory(); // try to preempt an OOM by using the reserve
@@ -1366,9 +1362,9 @@ public abstract class HashAggTemplate implements HashAggregator {
 
         restoreReservedMemory(); // restore the reserve, if possible
         // A reason to check for a spill - In case restore-reserve failed
-        needToCheckIfSpillIsNeeded = ( 0 == reserveValueBatchMemory );
+        needToCheckIfSpillIsNeeded = (0 == reserveValueBatchMemory);
 
-        if ( plannedBatches > 0 ) { plannedBatches--; } // just allocated a planned batch
+        if (plannedBatches > 0) { plannedBatches--; } // just allocated a planned batch
         long totalAddedMem = allocator.getAllocatedMemory() - allocatedBeforeHTput;
         long aggValuesAddedMem = allocator.getAllocatedMemory() - allocatedBeforeAggCol;
         logger.trace("MEMORY CHECK AGG: allocated now {}, added {}, total (with HT) added {}", allocator.getAllocatedMemory(),
@@ -1387,7 +1383,7 @@ public abstract class HashAggTemplate implements HashAggregator {
       } catch (OutOfMemoryException exc) {
           throw new OutOfMemoryException(getOOMErrorMsg("AGGR"), exc);
       }
-    } else if ( putStatus == HashTable.PutStatus.KEY_ADDED_LAST ) {
+    } else if (putStatus == HashTable.PutStatus.KEY_ADDED_LAST) {
         // If a batch just became full (i.e. another batch would be allocated soon) -- then need to
         // check (later, see below) if the memory limits are too close, and if so -- then spill !
         plannedBatches++; // planning to allocate one more batch
@@ -1411,7 +1407,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     // (Skip this if cannot spill, or not checking memory limits; in such case an OOM may
     // be encountered later - and some OOM cases are recoverable by spilling and retrying)
     // ===================================================================================
-    if ( needToCheckIfSpillIsNeeded && canSpill && useMemoryPrediction ) {
+    if (needToCheckIfSpillIsNeeded && canSpill && useMemoryPrediction) {
       spillIfNeeded(currentPartition);
     }
   }
@@ -1425,7 +1421,7 @@ public abstract class HashAggTemplate implements HashAggregator {
    */
   private void spillIfNeeded(int currentPartition, boolean forceSpill) {
     long maxMemoryNeeded = 0;
-    if ( !forceSpill ) { // need to check the memory in order to decide
+    if (!forceSpill) { // need to check the memory in order to decide
       // calculate the (max) new memory needed now; plan ahead for at least MIN batches
       maxMemoryNeeded = minBatchesPerPartition * Math.max(1, plannedBatches) * (estMaxBatchSize + MAX_BATCH_ROW_COUNT * (4 + 4 /* links + hash-values */));
       // Add the (max) size of the current hash table, in case it will double
@@ -1443,7 +1439,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     //
     //   Spill if (forced, or) the allocated memory plus the memory needed exceed the memory limit.
     //
-    if ( forceSpill || allocator.getAllocatedMemory() + maxMemoryNeeded > allocator.getLimit() ) {
+    if (forceSpill || allocator.getAllocatedMemory() + maxMemoryNeeded > allocator.getLimit()) {
 
       // Pick a "victim" partition to spill or return
       int victimPartition = chooseAPartitionToFlush(currentPartition, forceSpill);
@@ -1460,7 +1456,7 @@ public abstract class HashAggTemplate implements HashAggregator {
         return;
       }
 
-      if ( phase.is2nd() ) {
+      if (phase.is2nd()) {
         long before = allocator.getAllocatedMemory();
 
         spillAPartition(victimPartition);
@@ -1472,7 +1468,7 @@ public abstract class HashAggTemplate implements HashAggregator {
         // In case spilling did not free enough memory to recover the reserves
         boolean spillAgain = reserveOutgoingMemory == 0 || reserveValueBatchMemory == 0;
         // in some "edge" cases (e.g. testing), spilling one partition may not be enough
-        if ( spillAgain || allocator.getAllocatedMemory() + maxMemoryNeeded > allocator.getLimit() ) {
+        if (spillAgain || allocator.getAllocatedMemory() + maxMemoryNeeded > allocator.getLimit()) {
           int victimPartition2 = chooseAPartitionToFlush(victimPartition, true);
           if (victimPartition2 < 0) {
             // In the case of the forced spill, there is not enough memory to continue.
@@ -1495,7 +1491,7 @@ public abstract class HashAggTemplate implements HashAggregator {
         earlyOutput = true;
         earlyPartition = victimPartition;
 
-        if ( EXTRA_DEBUG_SPILL ) {
+        if (EXTRA_DEBUG_SPILL) {
           logger.debug("picked partition {} for early output", victimPartition);
         }
       }
@@ -1510,7 +1506,7 @@ public abstract class HashAggTemplate implements HashAggregator {
    */
   private void updateStats(HashTable[] htables) {
     if (!spilledState.isFirstCycle() ||  // These stats are only for before processing spilled files
-      handleEmit ) { return; } // and no stats collecting when handling an EMIT
+      handleEmit) { return; } // and no stats collecting when handling an EMIT
     long numSpilled = 0;
     HashTableStats newStats = new HashTableStats();
     // sum the stats from all the partitions
@@ -1521,18 +1517,18 @@ public abstract class HashAggTemplate implements HashAggregator {
         numSpilled++;
       }
     }
-    this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
-    this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
-    this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
-    this.stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
-    this.stats.setLongStat(Metric.NUM_PARTITIONS, spilledState.getNumPartitions());
-    this.stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // Put 0 in case no spill
-    if ( phase.is2nd() ) {
-      this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
+    stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
+    stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
+    stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
+    stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
+    stats.setLongStat(Metric.NUM_PARTITIONS, spilledState.getNumPartitions());
+    stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // Put 0 in case no spill
+    if (phase.is2nd()) {
+      stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
     }
-    if ( rowsReturnedEarly > 0 ) {
+    if (rowsReturnedEarly > 0) {
       stats.setLongStat(Metric.SPILL_MB, // update stats - est. total MB returned early
-          (int) Math.round( rowsReturnedEarly * estOutputRowWidth / 1024.0D / 1024.0));
+          (int) Math.round(rowsReturnedEarly * estOutputRowWidth / 1024.0D / 1024.0));
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordBatch.java
index 586d34d..72e127e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordBatch.java
@@ -122,8 +122,8 @@ public class SpilledRecordBatch implements CloseableRecordBatch {
   public int getRecordCount() { return container.getRecordCount(); }
 
   @Override
-  public void kill(boolean sendUpstream) {
-    this.close(); // delete the current spill file
+  public void cancel() {
+    close(); // delete the current spill file
   }
 
   /**
@@ -143,7 +143,6 @@ public class SpilledRecordBatch implements CloseableRecordBatch {
     }
 
     if (spillStream == null) {
-      lastOutcome = IterOutcome.STOP;
       throw new IllegalStateException("Spill stream was null");
     }
 
@@ -186,11 +185,6 @@ public class SpilledRecordBatch implements CloseableRecordBatch {
         container, spilledBatches, schema, spillFile, spillSet);
   }
 
-  @Override
-  public boolean hasFailed() {
-    return lastOutcome == IterOutcome.STOP;
-  }
-
   /**
    * Note: ignoring any IO errors (e.g. file not found)
    */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 476b316..7886a5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -169,9 +169,6 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
         state = BatchState.DONE;
         container.buildSchema(SelectionVectorMode.NONE);
         return;
-      case STOP:
-        state = BatchState.STOP;
-        return;
       default:
         break;
     }
@@ -235,8 +232,6 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
           }
           // else fall thru
         case NOT_YET:
-        case STOP:
-          return lastKnownOutcome;
         case OK_NEW_SCHEMA:
           createAggregator();
           firstBatchForSchema = true;
@@ -650,8 +645,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
-    incoming.kill(sendUpstream);
+  protected void cancelIncoming() {
+    incoming.cancel();
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index e2f431b..4a6822d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -150,7 +150,6 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
 
                 case NONE:
                   out = IterOutcome.OK_NEW_SCHEMA;
-                case STOP:
                 default:
                   lastOutcome = out;
                   outcome = out;
@@ -328,7 +327,6 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
                     continue outside;
                   }
                 }
-              case STOP:
               default:
                 lastOutcome = out;
                 outcome = out;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index be60982..73e464e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -95,7 +95,6 @@ public class BroadcastSenderRootExec extends BaseRootExec {
     RecordBatch.IterOutcome out = next(incoming);
     logger.debug("Outcome of sender next {}", out);
     switch(out){
-      case STOP:
       case NONE:
         for (int i = 0; i < tunnels.length; ++i) {
           FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvalSetupException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvalSetupException.java
index a39d8a1..6fc7154 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvalSetupException.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvalSetupException.java
@@ -17,6 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.filter;
 
+@SuppressWarnings("serial")
 public class EvalSetupException extends Exception{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EvalSetupException.class);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java
index 6f48efb..d80f818 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java
@@ -17,14 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.filter;
 
-import org.apache.drill.exec.record.selection.SelectionVector2;
-
 public class EvaluationPredicate {
-  private SelectionVector2 vector;
-
-  EvaluationPredicate(String pred){
-
-  }
-
-
+  EvaluationPredicate(String pred) { }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index a90c9aa..4179077 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -184,8 +184,8 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
-    super.killIncoming(sendUpstream);
+  protected void cancelIncoming() {
+    super.cancelIncoming();
     hasRemainder = false;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
index fe38244..f2eb611 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
@@ -23,7 +23,6 @@ import javax.inject.Named;
 
 import org.apache.drill.exec.exception.OversizedAllocationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
@@ -42,7 +41,6 @@ public abstract class FlattenTemplate implements Flattener {
   private static final int OUTPUT_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
 
   private ImmutableList<TransferPair> transfers;
-  private BufferAllocator outputAllocator;
   private SelectionVectorMode svMode;
   private RepeatedValueVector fieldToFlatten;
   private RepeatedValueVector.RepeatedAccessor accessor;
@@ -158,16 +156,16 @@ public abstract class FlattenTemplate implements Flattener {
         throw new UnsupportedOperationException("Flatten does not support selection vector inputs.");
       case TWO_BYTE:
         throw new UnsupportedOperationException("Flatten does not support selection vector inputs.");
+      default:
     }
     this.transfers = ImmutableList.copyOf(transfers);
-    outputAllocator = outgoing.getOutgoingContainer().getAllocator();
     doSetup(context, incoming, outgoing);
   }
 
   @Override
   public void resetGroupIndex() {
-    this.valueIndex = 0;
-    this.currentInnerValueIndex = 0;
+    valueIndex = 0;
+    currentInnerValueIndex = 0;
   }
 
   public abstract void doSetup(@Named("context") FragmentContext context,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index eeedc72..843d20b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -96,8 +96,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Implements the runtime execution for the Hash-Join operator
- * supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
+ * Implements the runtime execution for the Hash-Join operator supporting INNER,
+ * LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
  * <p>
  * This implementation splits the incoming Build side rows into multiple
  * Partitions, thus allowing spilling of some of these partitions to disk if
@@ -125,8 +125,10 @@ import org.slf4j.LoggerFactory;
  * greater) is a waste, indicating that the number of partitions chosen was too
  * small.
  */
-public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implements RowKeyJoin {
-  private static final Logger logger = LoggerFactory.getLogger(HashJoinBatch.class);
+public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP>
+    implements RowKeyJoin {
+  private static final Logger logger = LoggerFactory
+      .getLogger(HashJoinBatch.class);
 
   /**
    * The maximum number of records within each internal batch.
@@ -138,7 +140,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
   private final boolean semiJoin;
   private final boolean joinIsLeftOrFull;
   private final boolean joinIsRightOrFull;
-  private boolean skipHashTableBuild; // when outer side is empty, and the join is inner or left (see DRILL-6755)
+  private boolean skipHashTableBuild; // when outer side is empty, and the join
+                                      // is inner or left (see DRILL-6755)
 
   // Join conditions
   private final List<JoinCondition> conditions;
@@ -159,7 +162,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
   // Fields used for partitioning
   /**
    * The number of {@link HashPartition}s. This is configured via a system
-   * option and set in {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
+   * option and set in
+   * {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
    */
   private int numPartitions = 1; // must be 2 to the power of bitsInMask
 
@@ -170,7 +174,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
   private final MutableBoolean buildSideIsEmpty = new MutableBoolean(false);
   private final MutableBoolean probeSideIsEmpty = new MutableBoolean(false);
   private boolean canSpill = true;
-  private boolean wasKilled; // a kill was received, may need to clean spilled partns
+  private boolean wasKilled; // a kill was received, may need to clean spilled
+                             // partns
 
   /**
    * This array holds the currently active {@link HashPartition}s.
@@ -190,7 +195,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
 
   private final JoinControl joinControl;
 
-  // An iterator over the build side hash table (only applicable for row-key joins)
+  // An iterator over the build side hash table (only applicable for row-key
+  // joins)
   private boolean buildComplete;
 
   // indicates if we have previously returned an output batch
@@ -198,16 +204,19 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
 
   private int rightHVColPosition;
   private final BufferAllocator allocator;
-  // Local fields for left/right incoming - may be replaced when reading from spilled
+  // Local fields for left/right incoming - may be replaced when reading from
+  // spilled
   private RecordBatch buildBatch;
   private RecordBatch probeBatch;
 
   /**
-   * Flag indicating whether or not the first data holding build batch needs to be fetched.
+   * Flag indicating whether or not the first data holding build batch needs to
+   * be fetched.
    */
   private final MutableBoolean prefetchedBuild = new MutableBoolean(false);
   /**
-   * Flag indicating whether or not the first data holding probe batch needs to be fetched.
+   * Flag indicating whether or not the first data holding probe batch needs to
+   * be fetched.
    */
   private final MutableBoolean prefetchedProbe = new MutableBoolean(false);
 
@@ -215,10 +224,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
   private final SpillSet spillSet;
   HashJoinPOP popConfig;
 
-  private final int originalPartition = -1; // the partition a secondary reads from
-  IntVector read_right_HV_vector; // HV vector that was read from the spilled batch
+  private final int originalPartition = -1; // the partition a secondary reads
+                                            // from
+  IntVector read_right_HV_vector; // HV vector that was read from the spilled
+                                  // batch
   private final int maxBatchesInMemory;
-  private final List<String> probeFields = new ArrayList<>(); // keep the same sequence with the bloomFilters
+  private final List<String> probeFields = new ArrayList<>(); // keep the same
+                                                              // sequence with
+                                                              // the
+                                                              // bloomFilters
   private boolean enableRuntimeFilter;
   private RuntimeFilterReporter runtimeFilterReporter;
   private ValueVectorHashHelper.Hash64 hash64;
@@ -228,20 +242,20 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
   private boolean bloomFiltersGenerated;
 
   /**
-   * This holds information about the spilled partitions for the build and probe side.
+   * This holds information about the spilled partitions for the build and probe
+   * side.
    */
-  public static class HashJoinSpilledPartition extends AbstractSpilledPartitionMetadata {
+  public static class HashJoinSpilledPartition
+      extends AbstractSpilledPartitionMetadata {
     private final int innerSpilledBatches;
     private final String innerSpillFile;
     private int outerSpilledBatches;
     private String outerSpillFile;
     private boolean updatedOuter;
 
-    public HashJoinSpilledPartition(int cycle,
-                                    int originPartition,
-                                    int prevOriginPartition,
-                                    int innerSpilledBatches,
-                                    String innerSpillFile) {
+    public HashJoinSpilledPartition(int cycle, int originPartition,
+        int prevOriginPartition, int innerSpilledBatches,
+        String innerSpillFile) {
       super(cycle, originPartition, prevOriginPartition);
 
       this.innerSpilledBatches = innerSpilledBatches;
@@ -276,8 +290,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
 
     @Override
     public String makeDebugString() {
-      return String.format("Start reading spilled partition %d (prev %d) from cycle %d (with %d-%d batches).",
-        this.getOriginPartition(), this.getPrevOriginPartition(), this.getCycle(), outerSpilledBatches, innerSpilledBatches);
+      return String.format(
+          "Start reading spilled partition %d (prev %d) from cycle %d (with %d-%d batches).",
+          this.getOriginPartition(), this.getPrevOriginPartition(),
+          this.getCycle(), outerSpilledBatches, innerSpilledBatches);
     }
   }
 
@@ -308,35 +324,28 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
    */
   private final SpilledState<HashJoinSpilledPartition> spilledState = new SpilledState<>();
   private final HashJoinUpdater spilledStateUpdater = new HashJoinUpdater();
-  private HashJoinSpilledPartition spilledInners[]; // for the outer to find the partition
+  private HashJoinSpilledPartition spilledInners[]; // for the outer to find the
+                                                    // partition
 
   public enum Metric implements MetricDef {
-    NUM_BUCKETS,
-    NUM_ENTRIES,
-    NUM_RESIZING,
-    RESIZING_TIME_MS,
-    NUM_PARTITIONS,
-    SPILLED_PARTITIONS, // number of original partitions spilled to disk
-    SPILL_MB,         // Number of MB of data spilled to disk. This amount is first written,
-                      // then later re-read. So, disk I/O is twice this amount.
-    SPILL_CYCLE,       // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY
-    LEFT_INPUT_BATCH_COUNT,
-    LEFT_AVG_INPUT_BATCH_BYTES,
-    LEFT_AVG_INPUT_ROW_BYTES,
-    LEFT_INPUT_RECORD_COUNT,
-    RIGHT_INPUT_BATCH_COUNT,
-    RIGHT_AVG_INPUT_BATCH_BYTES,
-    RIGHT_AVG_INPUT_ROW_BYTES,
-    RIGHT_INPUT_RECORD_COUNT,
-    OUTPUT_BATCH_COUNT,
-    AVG_OUTPUT_BATCH_BYTES,
-    AVG_OUTPUT_ROW_BYTES,
-    OUTPUT_RECORD_COUNT;
+    NUM_BUCKETS, NUM_ENTRIES, NUM_RESIZING, RESIZING_TIME_MS, NUM_PARTITIONS,
+    // number of original partitions spilled to disk
+    SPILLED_PARTITIONS,
+    SPILL_MB, // Number of MB of data spilled to disk. This amount is first
+              // written,
+              // then later re-read. So, disk I/O is twice this amount.
+    SPILL_CYCLE, // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY
+    LEFT_INPUT_BATCH_COUNT, LEFT_AVG_INPUT_BATCH_BYTES, LEFT_AVG_INPUT_ROW_BYTES,
+    LEFT_INPUT_RECORD_COUNT, RIGHT_INPUT_BATCH_COUNT, RIGHT_AVG_INPUT_BATCH_BYTES,
+    RIGHT_AVG_INPUT_ROW_BYTES, RIGHT_INPUT_RECORD_COUNT, OUTPUT_BATCH_COUNT,
+    AVG_OUTPUT_BATCH_BYTES, AVG_OUTPUT_ROW_BYTES, OUTPUT_RECORD_COUNT;
 
     // duplicate for hash ag
 
     @Override
-    public int metricId() { return ordinal(); }
+    public int metricId() {
+      return ordinal();
+    }
   }
 
   @Override
@@ -352,7 +361,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
 
     if (validSchema) {
       // We are able to construct a valid schema from the upstream data.
-      // Setting the state here makes sure AbstractRecordBatch returns OK_NEW_SCHEMA
+      // Setting the state here makes sure AbstractRecordBatch returns
+      // OK_NEW_SCHEMA
       state = BatchState.BUILD_SCHEMA;
 
       if (leftUpstream == OK_NEW_SCHEMA) {
@@ -366,7 +376,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
         rightHVColPosition = right.getContainer().getNumberOfColumns();
         // In special cases, when the probe side is empty, and
         // inner/left join - no need for Hash Table
-        skipHashTableBuild = leftUpstream == IterOutcome.NONE && ! joinIsRightOrFull;
+        skipHashTableBuild = leftUpstream == IterOutcome.NONE
+            && !joinIsRightOrFull;
         // We only need the hash tables if we have data on the build side.
         setupHashTable();
       }
@@ -376,7 +387,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
 
     // If we have a valid schema, this will build a valid container.
     // If we were unable to obtain a valid schema,
-    // we still need to build a dummy schema. This code handles both cases for us.
+    // we still need to build a dummy schema. This code handles both cases for
+    // us.
     setupOutputContainerSchema();
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
     container.setEmpty();
@@ -386,52 +398,52 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
    * Prefetches the first build side data holding batch.
    */
   private void prefetchFirstBuildBatch() {
-    rightUpstream = prefetchFirstBatch(rightUpstream,
-      prefetchedBuild,
-      buildSideIsEmpty,
-      RIGHT_INDEX,
-      buildBatch,
-      () -> {
-        batchMemoryManager.update(RIGHT_INDEX, 0, true);
-        RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT,
-            batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX),
-            getRecordBatchStatsContext());
-      });
+    rightUpstream = prefetchFirstBatch(rightUpstream, prefetchedBuild,
+        buildSideIsEmpty, RIGHT_INDEX, buildBatch, () -> {
+          batchMemoryManager.update(RIGHT_INDEX, 0, true);
+          RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT,
+              batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX),
+              getRecordBatchStatsContext());
+        });
   }
 
   /**
    * Prefetches the first build side data holding batch.
    */
   private void prefetchFirstProbeBatch() {
-    leftUpstream =  prefetchFirstBatch(leftUpstream,
-      prefetchedProbe,
-      probeSideIsEmpty,
-      LEFT_INDEX,
-      probeBatch,
-      () -> {
-        batchMemoryManager.update(LEFT_INDEX, 0);
-        RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
-            batchMemoryManager.getRecordBatchSizer(LEFT_INDEX),
-            getRecordBatchStatsContext());
-      });
+    leftUpstream = prefetchFirstBatch(leftUpstream, prefetchedProbe,
+        probeSideIsEmpty, LEFT_INDEX, probeBatch, () -> {
+          batchMemoryManager.update(LEFT_INDEX, 0);
+          RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
+              batchMemoryManager.getRecordBatchSizer(LEFT_INDEX),
+              getRecordBatchStatsContext());
+        });
   }
 
   /**
-   * Used to fetch the first data holding batch from either the build or probe side.
-   * @param outcome The current upstream outcome for either the build or probe side.
-   * @param prefetched A flag indicating if we have already done a prefetch of the first data holding batch for the probe or build side.
-   * @param isEmpty A flag indicating if the probe or build side is empty.
-   * @param index The upstream index of the probe or build batch.
-   * @param batch The probe or build batch itself.
-   * @param memoryManagerUpdate A lambda function to execute the memory manager update for the probe or build batch.
-   * @return The current {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}.
+   * Used to fetch the first data holding batch from either the build or probe
+   * side.
+   *
+   * @param outcome
+   *          The current upstream outcome for either the build or probe side.
+   * @param prefetched
+   *          A flag indicating if we have already done a prefetch of the first
+   *          data holding batch for the probe or build side.
+   * @param isEmpty
+   *          A flag indicating if the probe or build side is empty.
+   * @param index
+   *          The upstream index of the probe or build batch.
+   * @param batch
+   *          The probe or build batch itself.
+   * @param memoryManagerUpdate
+   *          A lambda function to execute the memory manager update for the
+   *          probe or build batch.
+   * @return The current
+   *         {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}.
    */
   private IterOutcome prefetchFirstBatch(IterOutcome outcome,
-                                         MutableBoolean prefetched,
-                                         MutableBoolean isEmpty,
-                                         int index,
-                                         RecordBatch batch,
-                                         Runnable memoryManagerUpdate) {
+      MutableBoolean prefetched, MutableBoolean isEmpty, int index,
+      RecordBatch batch, Runnable memoryManagerUpdate) {
     if (prefetched.booleanValue()) {
       // We have already prefetch the first data holding batch
       return outcome;
@@ -445,32 +457,34 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
       outcome = sniffNonEmptyBatch(outcome, index, batch);
     }
 
-    isEmpty.setValue(outcome == IterOutcome.NONE); // If we received NONE there is no data.
+    isEmpty.setValue(outcome == IterOutcome.NONE); // If we received NONE there
+                                                   // is no data.
 
-    if (outcome == IterOutcome.STOP) {
-      // We reached a termination state
-      state = BatchState.STOP;
-    } else {
-      // Got our first batch(es)
-      if (spilledState.isFirstCycle()) {
-        // Only collect stats for the first cycle
-        memoryManagerUpdate.run();
-      }
-      state = BatchState.FIRST;
+    // Got our first batch(es)
+    if (spilledState.isFirstCycle()) {
+      // Only collect stats for the first cycle
+      memoryManagerUpdate.run();
     }
-
+    state = BatchState.FIRST;
     return outcome;
   }
 
   /**
-   * Currently in order to accurately predict memory usage for spilling, the first non-empty build or probe side batch is needed. This method
-   * fetches the first non-empty batch from the probe or build side.
-   * @param curr The current outcome.
-   * @param inputIndex Index specifying whether to work with the prorbe or build input.
-   * @param recordBatch The probe or build record batch.
-   * @return The {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left or right record batch.
+   * Currently in order to accurately predict memory usage for spilling, the
+   * first non-empty build or probe side batch is needed. This method fetches
+   * the first non-empty batch from the probe or build side.
+   *
+   * @param curr
+   *          The current outcome.
+   * @param inputIndex
+   *          Index specifying whether to work with the prorbe or build input.
+   * @param recordBatch
+   *          The probe or build record batch.
+   * @return The {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}
+   *         for the left or right record batch.
    */
-  private IterOutcome sniffNonEmptyBatch(IterOutcome curr, int inputIndex, RecordBatch recordBatch) {
+  private IterOutcome sniffNonEmptyBatch(IterOutcome curr, int inputIndex,
+      RecordBatch recordBatch) {
     while (true) {
       if (recordBatch.getRecordCount() != 0) {
         return curr;
@@ -479,34 +493,41 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
       curr = next(inputIndex, recordBatch);
 
       switch (curr) {
-        case OK:
-          // We got a data batch
-          break;
-        case NOT_YET:
-          // We need to try again
-          break;
-        case EMIT:
-          throw new UnsupportedOperationException("We do not support " + EMIT);
-        default:
-          // Other cases are termination conditions
-          return curr;
+      case OK:
+        // We got a data batch
+        break;
+      case NOT_YET:
+        // We need to try again
+        break;
+      case EMIT:
+        throw new UnsupportedOperationException("We do not support " + EMIT);
+      default:
+        // Other cases are termination conditions
+        return curr;
       }
     }
   }
 
   /**
-   * Determines the memory calculator to use. If maxNumBatches is configured simple batch counting is used to spill. Otherwise
-   * memory calculations are used to determine when to spill.
+   * Determines the memory calculator to use. If maxNumBatches is configured
+   * simple batch counting is used to spill. Otherwise memory calculations are
+   * used to determine when to spill.
+   *
    * @return The memory calculator to use.
    */
   public HashJoinMemoryCalculator getCalculatorImpl() {
     if (maxBatchesInMemory == 0) {
-      double safetyFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_SAFETY_FACTOR_KEY);
-      double fragmentationFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_FRAGMENTATION_FACTOR_KEY);
-      double hashTableDoublingFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_HASH_DOUBLE_FACTOR_KEY);
-      String hashTableCalculatorType = context.getOptions().getString(ExecConstants.HASHJOIN_HASHTABLE_CALC_TYPE_KEY);
-
-      return new HashJoinMemoryCalculatorImpl(safetyFactor, fragmentationFactor, hashTableDoublingFactor, hashTableCalculatorType, semiJoin);
+      double safetyFactor = context.getOptions()
+          .getDouble(ExecConstants.HASHJOIN_SAFETY_FACTOR_KEY);
+      double fragmentationFactor = context.getOptions()
+          .getDouble(ExecConstants.HASHJOIN_FRAGMENTATION_FACTOR_KEY);
+      double hashTableDoublingFactor = context.getOptions()
+          .getDouble(ExecConstants.HASHJOIN_HASH_DOUBLE_FACTOR_KEY);
+      String hashTableCalculatorType = context.getOptions()
+          .getString(ExecConstants.HASHJOIN_HASHTABLE_CALC_TYPE_KEY);
+
+      return new HashJoinMemoryCalculatorImpl(safetyFactor, fragmentationFactor,
+          hashTableDoublingFactor, hashTableCalculatorType, semiJoin);
     } else {
       return new HashJoinMechanicalMemoryCalculator(maxBatchesInMemory);
     }
@@ -515,30 +536,32 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
   @Override
   public IterOutcome innerNext() {
     if (wasKilled) {
-      // We have received a kill signal. We need to stop processing.
-      this.cleanup();
-      super.close();
+      // We have received a cancel signal. We need to stop processing.
+      cleanup();
       return IterOutcome.NONE;
     }
 
     prefetchFirstBuildBatch();
 
     if (rightUpstream.isError()) {
-      // A termination condition was reached while prefetching the first build side data holding batch.
+      // A termination condition was reached while prefetching the first build
+      // side data holding batch.
       // We need to terminate.
       return rightUpstream;
     }
 
     try {
-      /* If we are here for the first time, execute the build phase of the
-       * hash join and setup the run time generated class for the probe side
+      /*
+       * If we are here for the first time, execute the build phase of the hash
+       * join and setup the run time generated class for the probe side
        */
       if (state == BatchState.FIRST) {
         // Build the hash table, using the build side record batches.
         IterOutcome buildExecuteTermination = executeBuildPhase();
 
         if (buildExecuteTermination != null) {
-          // A termination condition was reached while executing the build phase.
+          // A termination condition was reached while executing the build
+          // phase.
           // We need to terminate.
           return buildExecuteTermination;
         }
@@ -546,14 +569,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
         buildComplete = true;
 
         if (isRowKeyJoin) {
-          // discard the first left batch which was fetched by buildSchema, and get the new
+          // discard the first left batch which was fetched by buildSchema, and
+          // get the new
           // one based on rowkey join
           leftUpstream = next(left);
-
-          if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
-            state = BatchState.STOP;
-            return leftUpstream;
-          }
         }
 
         // Update the hash table related stats for the operator
@@ -561,53 +580,51 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
       }
 
       // Try to probe and project, or recursively handle a spilled partition
-      if (!buildSideIsEmpty.booleanValue() ||  // If there are build-side rows
-          joinIsLeftOrFull) {  // or if this is a left/full outer join
+      if (!buildSideIsEmpty.booleanValue() || // If there are build-side rows
+          joinIsLeftOrFull) { // or if this is a left/full outer join
 
         prefetchFirstProbeBatch();
 
-        if (leftUpstream.isError() ||
-            ( leftUpstream == NONE && ! joinIsRightOrFull )) {
-          // A termination condition was reached while prefetching the first probe side data holding batch.
+        if (leftUpstream.isError()
+            || (leftUpstream == NONE && !joinIsRightOrFull)) {
+          // A termination condition was reached while prefetching the first
+          // probe side data holding batch.
           // We need to terminate.
           return leftUpstream;
         }
 
-        if (!buildSideIsEmpty.booleanValue() || !probeSideIsEmpty.booleanValue()) {
-          // Only allocate outgoing vectors and execute probing logic if there is data
+        if (!buildSideIsEmpty.booleanValue()
+            || !probeSideIsEmpty.booleanValue()) {
+          // Only allocate outgoing vectors and execute probing logic if there
+          // is data
 
           if (state == BatchState.FIRST) {
             // Initialize various settings for the probe side
-            hashJoinProbe.setupHashJoinProbe(probeBatch,
-              this,
-              joinType,
-              semiJoin,
-              leftUpstream,
-              partitions,
-              spilledState.getCycle(),
-              container,
-              spilledInners,
-              buildSideIsEmpty.booleanValue(),
-              numPartitions,
-              rightHVColPosition);
+            hashJoinProbe.setupHashJoinProbe(probeBatch, this, joinType,
+                semiJoin, leftUpstream, partitions, spilledState.getCycle(),
+                container, spilledInners, buildSideIsEmpty.booleanValue(),
+                numPartitions, rightHVColPosition);
           }
 
           // Allocate the memory for the vectors in the output container
           batchMemoryManager.allocateVectors(container);
 
-          hashJoinProbe.setTargetOutputCount(batchMemoryManager.getOutputRowCount());
+          hashJoinProbe
+              .setTargetOutputCount(batchMemoryManager.getOutputRowCount());
 
           outputRecords = hashJoinProbe.probeAndProject();
 
           container.setValueCount(outputRecords);
 
           batchMemoryManager.updateOutgoingStats(outputRecords);
-          RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
-
-          /* We are here because of one the following
-           * 1. Completed processing of all the records and we are done
-           * 2. We've filled up the outgoing batch to the maximum and we need to return upstream
-           * Either case build the output container's schema and return
+          RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this,
+              getRecordBatchStatsContext());
+
+          /*
+           * We are here because of one the following 1. Completed processing of
+           * all the records and we are done 2. We've filled up the outgoing
+           * batch to the maximum and we need to return upstream Either case
+           * build the output container's schema and return
            */
           if (outputRecords > 0 || state == BatchState.FIRST) {
             state = BatchState.NOT_FIRST;
@@ -623,47 +640,66 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
         }
 
         //
-        //  (recursively) Handle the spilled partitions, if any
+        // (recursively) Handle the spilled partitions, if any
         //
         if (!buildSideIsEmpty.booleanValue()) {
-          while (!spilledState.isEmpty()) {  // "while" is only used for skipping; see "continue" below
+          while (!spilledState.isEmpty()) { // "while" is only used for
+                                            // skipping; see "continue" below
 
             // Get the next (previously) spilled partition to handle as incoming
-            HashJoinSpilledPartition currSp = spilledState.getNextSpilledPartition();
+            HashJoinSpilledPartition currSp = spilledState
+                .getNextSpilledPartition();
 
-            // If the outer is empty (and it's not a right/full join) - try the next spilled partition
+            // If the outer is empty (and it's not a right/full join) - try the
+            // next spilled partition
             if (currSp.outerSpilledBatches == 0 && !joinIsRightOrFull) {
               continue;
             }
 
-            // Create a BUILD-side "incoming" out of the inner spill file of that partition
-            buildBatch = new SpilledRecordBatch(currSp.innerSpillFile, currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
-            // The above ctor call also got the first batch; need to update the outcome
-            rightUpstream = ((SpilledRecordBatch) buildBatch).getInitialOutcome();
+            // Create a BUILD-side "incoming" out of the inner spill file of
+            // that partition
+            buildBatch = new SpilledRecordBatch(currSp.innerSpillFile,
+                currSp.innerSpilledBatches, context, buildSchema, oContext,
+                spillSet);
+            // The above ctor call also got the first batch; need to update the
+            // outcome
+            rightUpstream = ((SpilledRecordBatch) buildBatch)
+                .getInitialOutcome();
 
             if (currSp.outerSpilledBatches > 0) {
-              // Create a PROBE-side "incoming" out of the outer spill file of that partition
-              probeBatch = new SpilledRecordBatch(currSp.outerSpillFile, currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet);
-              // The above ctor call also got the first batch; need to update the outcome
-              leftUpstream = ((SpilledRecordBatch) probeBatch).getInitialOutcome();
+              // Create a PROBE-side "incoming" out of the outer spill file of
+              // that partition
+              probeBatch = new SpilledRecordBatch(currSp.outerSpillFile,
+                  currSp.outerSpilledBatches, context, probeSchema, oContext,
+                  spillSet);
+              // The above ctor call also got the first batch; need to update
+              // the outcome
+              leftUpstream = ((SpilledRecordBatch) probeBatch)
+                  .getInitialOutcome();
             } else {
-              probeBatch = left; // if no outer batch then reuse left - needed for updateIncoming()
+              probeBatch = left; // if no outer batch then reuse left - needed
+                                 // for updateIncoming()
               leftUpstream = IterOutcome.NONE;
               hashJoinProbe.changeToFinalProbeState();
             }
 
             spilledState.updateCycle(stats, currSp, spilledStateUpdater);
-            state = BatchState.FIRST;  // TODO need to determine if this is still necessary since prefetchFirstBatchFromBothSides sets this
+            state = BatchState.FIRST; // TODO need to determine if this is still
+                                      // necessary since
+                                      // prefetchFirstBatchFromBothSides sets
+                                      // this
 
             prefetchedBuild.setValue(false);
             prefetchedProbe.setValue(false);
 
-            return innerNext(); // start processing the next spilled partition "recursively"
+            return innerNext(); // start processing the next spilled partition
+                                // "recursively"
           }
         }
 
       } else {
-        // Our build side is empty, we won't have any matches, clear the probe side
+        // Our build side is empty, we won't have any matches, clear the probe
+        // side
         killAndDrainLeftUpstream();
       }
 
@@ -679,67 +715,91 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
   }
 
   /**
-   * In case an upstream data is no longer needed, send a kill and flush any remaining batch
+   * In case an upstream data is no longer needed, send a kill and flush any
+   * remaining batch
    *
-   * @param batch probe or build batch
-   * @param upstream which upstream
-   * @param isLeft is it the left or right
+   * @param batch
+   *          probe or build batch
+   * @param upstream
+   *          which upstream
+   * @param isLeft
+   *          is it the left or right
    */
-  private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream, boolean isLeft) {
-    batch.kill(true);
-    while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == IterOutcome.OK) {
+  private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream,
+      boolean isLeft) {
+    batch.cancel();
+    while (upstream == IterOutcome.OK_NEW_SCHEMA
+        || upstream == IterOutcome.OK) {
       VectorAccessibleUtilities.clear(batch);
-      upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT, batch);
+      upstream = next(
+          isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT,
+          batch);
     }
   }
 
-  private void killAndDrainLeftUpstream() { killAndDrainUpstream(probeBatch, leftUpstream, true); }
-  private void killAndDrainRightUpstream() { killAndDrainUpstream(buildBatch, rightUpstream, false); }
+  private void killAndDrainLeftUpstream() {
+    killAndDrainUpstream(probeBatch, leftUpstream, true);
+  }
+
+  private void killAndDrainRightUpstream() {
+    killAndDrainUpstream(buildBatch, rightUpstream, false);
+  }
 
   private void setupHashTable() {
-    List<Comparator> comparators = Lists.newArrayListWithExpectedSize(conditions.size());
-    conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
+    List<Comparator> comparators = Lists
+        .newArrayListWithExpectedSize(conditions.size());
+    conditions.forEach(cond -> comparators
+        .add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
 
-    if ( skipHashTableBuild ) { return; }
+    if (skipHashTableBuild) {
+      return;
+    }
 
     // Setup the hash table configuration object
     List<NamedExpression> leftExpr = new ArrayList<>(conditions.size());
 
     // Create named expressions from the conditions
     for (int i = 0; i < conditions.size(); i++) {
-      leftExpr.add(new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i)));
+      leftExpr.add(new NamedExpression(conditions.get(i).getLeft(),
+          new FieldReference("probe_side_" + i)));
     }
 
     // Set the left named expression to be null if the probe batch is empty.
-    if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) {
+    if (leftUpstream != IterOutcome.OK_NEW_SCHEMA
+        && leftUpstream != IterOutcome.OK) {
       leftExpr = null;
     } else {
-      if (probeBatch.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
-        throw UserException.internalError(null)
-          .message("Hash join does not support probe batch with selection vectors.")
-          .addContext("Probe batch has selection mode",
-              (probeBatch.getSchema().getSelectionVectorMode()).toString())
-          .build(logger);
+      if (probeBatch.getSchema()
+          .getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
+        throw UserException.internalError(null).message(
+            "Hash join does not support probe batch with selection vectors.")
+            .addContext("Probe batch has selection mode",
+                (probeBatch.getSchema().getSelectionVectorMode()).toString())
+            .build(logger);
       }
     }
 
-    HashTableConfig htConfig = new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
-      true, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators, joinControl.asInt());
+    HashTableConfig htConfig = new HashTableConfig(
+        (int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
+        true, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators,
+        joinControl.asInt());
 
     // Create the chained hash table
-    baseHashTable =
-      new ChainedHashTable(htConfig, context, allocator, buildBatch, probeBatch, null);
+    baseHashTable = new ChainedHashTable(htConfig, context, allocator,
+        buildBatch, probeBatch, null);
     if (enableRuntimeFilter) {
       setupHash64(htConfig);
     }
   }
 
   private void setupHash64(HashTableConfig htConfig) {
-    LogicalExpression[] keyExprsBuild = new LogicalExpression[htConfig.getKeyExprsBuild().size()];
+    LogicalExpression[] keyExprsBuild = new LogicalExpression[htConfig
+        .getKeyExprsBuild().size()];
     ErrorCollector collector = new ErrorCollectorImpl();
     int i = 0;
     for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
-      LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), buildBatch, collector, context.getFunctionRegistry());
+      LogicalExpression expr = ExpressionTreeMaterializer.materialize(
+          ne.getExpr(), buildBatch, collector, context.getFunctionRegistry());
       collector.reportErrors(logger);
       if (expr == null) {
         continue;
@@ -761,15 +821,18 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
       i++;
     }
     if (missingField) {
-      logger.info("As some build side key fields not found, runtime filter was disabled");
+      logger.info(
+          "As some build side key fields not found, runtime filter was disabled");
       enableRuntimeFilter = false;
       return;
     }
     RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
-    List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
+    List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef
+        .getBloomFilterDefs();
     for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
       String buildField = bloomFilterDef.getBuildField();
-      SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(buildField), ExpressionPosition.UNKNOWN);
+      SchemaPath schemaPath = new SchemaPath(
+          new PathSegment.NameSegment(buildField), ExpressionPosition.UNKNOWN);
       TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath);
       if (typedFieldId == null) {
         missingField = true;
@@ -779,28 +842,30 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
       bloomFilterDef2buildId.put(bloomFilterDef, fieldId);
     }
     if (missingField) {
-      logger.info("As some build side join key fields not found, runtime filter was disabled");
+      logger.info(
+          "As some build side join key fields not found, runtime filter was disabled");
       enableRuntimeFilter = false;
       return;
     }
-    ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(buildBatch, context);
+    ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(buildBatch,
+        context);
     try {
       hash64 = hashHelper.getHash64(keyExprsBuild, buildSideTypeFieldIds);
     } catch (Exception e) {
       throw UserException.internalError(e)
-            .message("Failed to construct a field's hash64 dynamic codes")
-            .build(logger);
+          .message("Failed to construct a field's hash64 dynamic codes")
+          .build(logger);
     }
   }
 
   /**
-   *  Call only after num partitions is known
+   * Call only after num partitions is known
    */
   private void delayedSetup() {
     //
-    //  Find out the estimated max batch size, etc
-    //  and compute the max numPartitions possible
-    //  See partitionNumTuning()
+    // Find out the estimated max batch size, etc
+    // and compute the max numPartitions possible
+    // See partitionNumTuning()
     //
 
     spilledState.initialize(numPartitions);
@@ -812,11 +877,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
    * Initialize fields (that may be reused when reading spilled partitions)
    */
   private void initializeBuild() {
-    baseHashTable.updateIncoming(buildBatch, probeBatch); // in case we process the spilled files
+    baseHashTable.updateIncoming(buildBatch, probeBatch); // in case we process
+                                                          // the spilled files
     // Recreate the partitions every time build is initialized
-    for (int part = 0; part < numPartitions; part++ ) {
-      partitions[part] = new HashPartition(context, allocator, baseHashTable, buildBatch, probeBatch, semiJoin,
-        RECORDS_PER_BATCH, spillSet, part, spilledState.getCycle(), numPartitions);
+    for (int part = 0; part < numPartitions; part++) {
+      partitions[part] = new HashPartition(context, allocator, baseHashTable,
+          buildBatch, probeBatch, semiJoin, RECORDS_PER_BATCH, spillSet, part,
+          spilledState.getCycle(), numPartitions);
     }
 
     spilledInners = new HashJoinSpilledPartition[numPartitions];
@@ -824,25 +891,29 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
   }
 
   /**
-   * Note:
-   * This method can not be called again as part of recursive call of executeBuildPhase() to handle spilled build partitions.
+   * Note: This method can not be called again as part of recursive call of
+   * executeBuildPhase() to handle spilled build partitions.
    */
   private void initializeRuntimeFilter() {
     if (!enableRuntimeFilter || bloomFiltersGenerated) {
       return;
     }
-    runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context);
+    runtimeFilterReporter = new RuntimeFilterReporter(
+        (ExecutorFragmentContext) context);
     RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
-    //RuntimeFilter is not a necessary part of a HashJoin operator, only the query which satisfy the
-    //RuntimeFilterRouter's judgement will have the RuntimeFilterDef.
+    // RuntimeFilter is not a necessary part of a HashJoin operator, only the
+    // query which satisfy the
+    // RuntimeFilterRouter's judgement will have the RuntimeFilterDef.
     if (runtimeFilterDef != null) {
-      List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
+      List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef
+          .getBloomFilterDefs();
       for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
         int buildFieldId = bloomFilterDef2buildId.get(bloomFilterDef);
         int numBytes = bloomFilterDef.getNumBytes();
-        String probeField =  bloomFilterDef.getProbeField();
+        String probeField = bloomFilterDef.getProbeField();
         probeFields.add(probeField);
-        BloomFilter bloomFilter = new BloomFilter(numBytes, context.getAllocator());
+        BloomFilter bloomFilter = new BloomFilter(numBytes,
+            context.getAllocator());
         bloomFilters.add(bloomFilter);
         bloomFilter2buildId.put(bloomFilter, buildFieldId);
       }
@@ -851,15 +922,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
   }
 
   /**
-   * Tunes the number of partitions used by {@link HashJoinBatch}. If it is not possible to spill it gives up and reverts
-   * to unbounded in memory operation.
-   * @param maxBatchSize
-   * @param buildCalc
-   * @return
+   * Tunes the number of partitions used by {@link HashJoinBatch}. If it is not
+   * possible to spill it gives up and reverts to unbounded in memory operation.
    */
   private HashJoinMemoryCalculator.BuildSidePartitioning partitionNumTuning(
-    int maxBatchSize,
-    HashJoinMemoryCalculator.BuildSidePartitioning buildCalc) {
+      int maxBatchSize,
+      HashJoinMemoryCalculator.BuildSidePartitioning buildCalc) {
     // Get auto tuning result
     numPartitions = buildCalc.getNumPartitions();
 
@@ -868,15 +936,19 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
     }
 
     if (buildCalc.getMaxReservedMemory() > allocator.getLimit()) {
-      // We don't have enough memory to do any spilling. Give up and do no spilling and have no limits
-
-      // TODO dirty hack to prevent regressions. Remove this once batch sizing is implemented.
-      // We don't have enough memory to do partitioning, we have to do everything in memory
-      String message = String.format("When using the minimum number of partitions %d we require %s memory but only have %s available. " +
-          "Forcing legacy behavoir of using unbounded memory in order to prevent regressions.",
-        numPartitions,
-        FileUtils.byteCountToDisplaySize(buildCalc.getMaxReservedMemory()),
-        FileUtils.byteCountToDisplaySize(allocator.getLimit()));
+      // We don't have enough memory to do any spilling. Give up and do no
+      // spilling and have no limits
+
+      // TODO dirty hack to prevent regressions. Remove this once batch sizing
+      // is implemented.
+      // We don't have enough memory to do partitioning, we have to do
+      // everything in memory
+      String message = String.format(
+          "When using the minimum number of partitions %d we require %s memory but only have %s available. "
+              + "Forcing legacy behavoir of using unbounded memory in order to prevent regressions.",
+          numPartitions,
+          FileUtils.byteCountToDisplaySize(buildCalc.getMaxReservedMemory()),
+          FileUtils.byteCountToDisplaySize(allocator.getLimit()));
       logger.warn(message);
 
       // create a Noop memory calculator
@@ -884,20 +956,14 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
       calc.initialize(false);
       buildCalc = calc.next();
 
-      buildCalc.initialize(true,
-        true, // TODO Fix after growing hash values bug fixed
-        buildBatch,
-        probeBatch,
-        buildJoinColumns,
-        leftUpstream == IterOutcome.NONE, // probeEmpty
-        allocator.getLimit(),
-        numPartitions,
-        RECORDS_PER_BATCH,
-        RECORDS_PER_BATCH,
-        maxBatchSize,
-        maxBatchSize,
-        batchMemoryManager.getOutputBatchSize(),
-        HashTable.DEFAULT_LOAD_FACTOR);
+      buildCalc.initialize(true, true, // TODO Fix after growing hash values bug
+                                       // fixed
+          buildBatch, probeBatch, buildJoinColumns,
+          leftUpstream == IterOutcome.NONE, // probeEmpty
+          allocator.getLimit(), numPartitions, RECORDS_PER_BATCH,
+          RECORDS_PER_BATCH, maxBatchSize, maxBatchSize,
+          batchMemoryManager.getOutputBatchSize(),
+          HashTable.DEFAULT_LOAD_FACTOR);
 
       disableSpilling(null);
     }
@@ -906,20 +972,29 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
   }
 
   /**
-   *  Disable spilling - use only a single partition and set the memory limit to the max ( 10GB )
-   *  @param reason If not null - log this as warning, else check fallback setting to either warn or fail.
+   * Disable spilling - use only a single partition and set the memory limit to
+   * the max ( 10GB )
+   *
+   * @param reason
+   *          If not null - log this as warning, else check fallback setting to
+   *          either warn or fail.
    */
   private void disableSpilling(String reason) {
-    // Fail, or just issue a warning if a reason was given, or a fallback option is enabled
-    if ( reason == null ) {
-      boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY).bool_val;
+    // Fail, or just issue a warning if a reason was given, or a fallback option
+    // is enabled
+    if (reason == null) {
+      boolean fallbackEnabled = context.getOptions()
+          .getOption(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY).bool_val;
       if (fallbackEnabled) {
-        logger.warn("Spilling is disabled - not enough memory available for internal partitioning. Falling back" +
-          " to use unbounded memory");
+        logger.warn(
+            "Spilling is disabled - not enough memory available for internal partitioning. Falling back"
+                + " to use unbounded memory");
       } else {
-        throw UserException.resourceError().message(String.format("Not enough memory for internal partitioning and fallback mechanism for " +
-          "HashJoin to use unbounded memory is disabled. Either enable fallback config %s using Alter " +
-          "session/system command or increase memory limit for Drillbit", ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY)).build(logger);
+        throw UserException.resourceError().message(String.format(
+            "Not enough memory for internal partitioning and fallback mechanism for "
+                + "HashJoin to use unbounded memory is disabled. Either enable fallback config %s using Alter "
+                + "session/system command or increase memory limit for Drillbit",
+            ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY)).build(logger);
       }
     } else {
       logger.warn(reason);
@@ -927,14 +1002,17 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
 
     numPartitions = 1; // We are only using one partition
     canSpill = false; // We cannot spill
-    allocator.setLimit(AbstractBase.MAX_ALLOCATION); // Violate framework and force unbounded memory
+    allocator.setLimit(AbstractBase.MAX_ALLOCATION); // Violate framework and
+                                                     // force unbounded memory
   }
 
   /**
-   *  Execute the BUILD phase; first read incoming and split rows into partitions;
-   *  may decide to spill some of the partitions
+   * Execute the BUILD phase; first read incoming and split rows into
+   * partitions; may decide to spill some of the partitions
    *
-   * @return Returns an {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} if a termination condition is reached. Otherwise returns null.
+   * @return Returns an
+   *         {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} if a
+   *         termination condition is reached. Otherwise returns null.
    * @throws SchemaChangeException
    */
   public IterOutcome executeBuildPhase() throws SchemaChangeException {
@@ -943,7 +1021,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
       return null;
     }
 
-    if ( skipHashTableBuild ) { // No hash table needed - then consume all the right upstream
+    if (skipHashTableBuild) { // No hash table needed - then consume all the
+                              // right upstream
       killAndDrainRightUpstream();
       return null;
     }
@@ -953,26 +1032,25 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
     {
       // Initializing build calculator
       // Limit scope of these variables to this block
-      int maxBatchSize = spilledState.isFirstCycle()? RecordBatch.MAX_BATCH_ROW_COUNT: RECORDS_PER_BATCH;
-      boolean doMemoryCalculation = canSpill && !probeSideIsEmpty.booleanValue();
+      int maxBatchSize = spilledState.isFirstCycle()
+          ? RecordBatch.MAX_BATCH_ROW_COUNT
+          : RECORDS_PER_BATCH;
+      boolean doMemoryCalculation = canSpill
+          && !probeSideIsEmpty.booleanValue();
       HashJoinMemoryCalculator calc = getCalculatorImpl();
 
       calc.initialize(doMemoryCalculation);
       buildCalc = calc.next();
 
-      buildCalc.initialize(spilledState.isFirstCycle(), true, // TODO Fix after growing hash values bug fixed
-        buildBatch,
-        probeBatch,
-        buildJoinColumns,
-        probeSideIsEmpty.booleanValue(),
-        allocator.getLimit(),
-        numPartitions,
-        RECORDS_PER_BATCH,
-        RECORDS_PER_BATCH,
-        maxBatchSize,
-        maxBatchSize,
-        batchMemoryManager.getOutputBatchSize(),
-        HashTable.DEFAULT_LOAD_FACTOR);
+      buildCalc.initialize(spilledState.isFirstCycle(), true, // TODO Fix after
+                                                              // growing hash
+                                                              // values bug
+                                                              // fixed
+          buildBatch, probeBatch, buildJoinColumns,
+          probeSideIsEmpty.booleanValue(), allocator.getLimit(), numPartitions,
+          RECORDS_PER_BATCH, RECORDS_PER_BATCH, maxBatchSize, maxBatchSize,
+          batchMemoryManager.getOutputBatchSize(),
+          HashTable.DEFAULT_LOAD_FACTOR);
 
       if (spilledState.isFirstCycle() && doMemoryCalculation) {
         // Do auto tuning
@@ -990,7 +1068,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
     initializeRuntimeFilter();
 
     // Make the calculator aware of our partitions
-    HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = new HashJoinMemoryCalculator.PartitionStatSet(partitions);
+    HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = new HashJoinMemoryCalculator.PartitionStatSet(
+        partitions);
     buildCalc.setPartitionStatSet(partitionStatSet);
 
     boolean moreData = true;
@@ -998,22 +1077,25 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
       switch (rightUpstream) {
       case NONE:
       case NOT_YET:
-      case STOP:
         moreData = false;
         continue;
 
       case OK_NEW_SCHEMA:
         if (!buildSchema.equals(buildBatch.getSchema())) {
-          throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in build side.", buildSchema, buildBatch.getSchema());
+          throw SchemaChangeException.schemaChanged(
+              "Hash join does not support schema changes in build side.",
+              buildSchema, buildBatch.getSchema());
+        }
+        for (HashPartition partn : partitions) {
+          partn.updateBatches();
         }
-        for (HashPartition partn : partitions) { partn.updateBatches(); }
         // Fall through
       case OK:
         batchMemoryManager.update(buildBatch, RIGHT_INDEX, 0, true);
         int currentRecordCount = buildBatch.getRecordCount();
-        //create runtime filter
+        // create runtime filter
         if (spilledState.isFirstCycle() && enableRuntimeFilter) {
-          //create runtime filter and send out async
+          // create runtime filter and send out async
           for (BloomFilter bloomFilter : bloomFilter2buildId.keySet()) {
             int fieldId = bloomFilter2buildId.get(bloomFilter);
             for (int ind = 0; ind < currentRecordCount; ind++) {
@@ -1022,31 +1104,39 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
             }
           }
         }
-        // Special treatment (when no spill, and single partition) -- use the incoming vectors as they are (no row copy)
-        if ( numPartitions == 1 ) {
+        // Special treatment (when no spill, and single partition) -- use the
+        // incoming vectors as they are (no row copy)
+        if (numPartitions == 1) {
           partitions[0].appendBatch(buildBatch);
           break;
         }
 
         if (!spilledState.isFirstCycle()) {
-          read_right_HV_vector = (IntVector) buildBatch.getContainer().getLast();
+          read_right_HV_vector = (IntVector) buildBatch.getContainer()
+              .getLast();
         }
 
-        // For every record in the build batch, hash the key columns and keep the result
+        // For every record in the build batch, hash the key columns and keep
+        // the result
         for (int ind = 0; ind < currentRecordCount; ind++) {
-          int hashCode = spilledState.isFirstCycle() ? partitions[0].getBuildHashCode(ind)
-            : read_right_HV_vector.getAccessor().get(ind); // get the hash value from the HV column
+          int hashCode = spilledState.isFirstCycle()
+              ? partitions[0].getBuildHashCode(ind)
+              : read_right_HV_vector.getAccessor().get(ind); // get the hash
+                                                             // value from the
+                                                             // HV column
           int currPart = hashCode & spilledState.getPartitionMask();
           hashCode >>>= spilledState.getBitsInMask();
           // semi-join skips join-key-duplicate rows
-          if ( semiJoin ) {
+          if (semiJoin) {
 
           }
-          // Append the new inner row to the appropriate partition; spill (that partition) if needed
-          partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, buildCalc);
+          // Append the new inner row to the appropriate partition; spill (that
+          // partition) if needed
+          partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind,
+              hashCode, buildCalc);
         }
 
-        if ( read_right_HV_vector != null ) {
+        if (read_right_HV_vector != null) {
           read_right_HV_vector.clear();
           read_right_HV_vector = null;
         }
@@ -1061,14 +1151,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
     if (spilledState.isFirstCycle() && enableRuntimeFilter) {
       if (bloomFilter2buildId.size() > 0) {
         int hashJoinOpId = this.popConfig.getOperatorId();
-        runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef(), hashJoinOpId);
+        runtimeFilterReporter.sendOut(bloomFilters, probeFields,
+            this.popConfig.getRuntimeFilterDef(), hashJoinOpId);
       }
     }
 
     // Move the remaining current batches into their temp lists, or spill
     // them if the partition is spilled. Add the spilled partitions into
     // the spilled partitions list
-    if ( numPartitions > 1 ) { // a single partition needs no completion
+    if (numPartitions > 1) { // a single partition needs no completion
       for (HashPartition partn : partitions) {
         partn.completeAnInnerBatch(false, partn.isSpilled());
       }
@@ -1077,15 +1168,18 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
     prefetchFirstProbeBatch();
 
     if (leftUpstream.isError()) {
-      // A termination condition was reached while prefetching the first build side data holding batch.
+      // A termination condition was reached while prefetching the first build
+      // side data holding batch.
       // We need to terminate.
       return leftUpstream;
     }
 
-    HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc.next();
+    HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc
+        .next();
     postBuildCalc.initialize(probeSideIsEmpty.booleanValue()); // probeEmpty
 
-    //  Traverse all the in-memory partitions' incoming batches, and build their hash tables
+    // Traverse all the in-memory partitions' incoming batches, and build their
+    // hash tables
 
     for (int index = 0; index < partitions.length; index++) {
       HashPartition partn = partitions[index];
@@ -1104,9 +1198,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
           partn.buildContainersHashTableAndHelper();
         }
       } catch (OutOfMemoryException e) {
-        String message = "Failed building hash table on partition " + index + ":\n"
-          + makeDebugString() + "\n"
-          + postBuildCalc.makeDebugString();
+        String message = "Failed building hash table on partition " + index
+            + ":\n" + makeDebugString() + "\n"
+            + postBuildCalc.makeDebugString();
         // Include debug info
         throw new OutOfMemoryException(message, e);
       }
@@ -1117,18 +1211,18 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
     }
 
     for (HashPartition partn : partitions) {
-      if ( partn.isSpilled() ) {
-        HashJoinSpilledPartition sp = new HashJoinSpilledPartition(spilledState.getCycle(),
-          partn.getPartitionNum(),
-          originalPartition,
-          partn.getPartitionBatchesCount(),
-          partn.getSpillFile());
+      if (partn.isSpilled()) {
+        HashJoinSpilledPartition sp = new HashJoinSpilledPartition(
+            spilledState.getCycle(), partn.getPartitionNum(), originalPartition,
+            partn.getPartitionBatchesCount(), partn.getSpillFile());
 
         spilledState.addPartition(sp);
-        spilledInners[partn.getPartitionNum()] = sp; // for the outer to find the SP later
+        spilledInners[partn.getPartitionNum()] = sp; // for the outer to find
+                                                     // the SP later
         partn.closeWriter();
 
-        partn.updateProbeRecordsPerBatch(postBuildCalc.getProbeRecordsPerBatch());
+        partn.updateProbeRecordsPerBatch(
+            postBuildCalc.getProbeRecordsPerBatch());
       }
     }
 
@@ -1137,12 +1231,14 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
 
   private void setupOutputContainerSchema() {
 
-    if (buildSchema != null && ! semiJoin ) {
+    if (buildSchema != null && !semiJoin) {
       for (MaterializedField field : buildSchema) {
         MajorType inputType = field.getType();
         MajorType outputType;
-        // If left or full outer join, then the output type must be nullable. However, map types are
-        // not nullable so we must exclude them from the check below (see DRILL-2197).
+        // If left or full outer join, then the output type must be nullable.
+        // However, map types are
+        // not nullable so we must exclude them from the check below (see
+        // DRILL-2197).
         if (joinIsLeftOrFull && inputType.getMode() == DataMode.REQUIRED
             && inputType.getMinorType() != TypeProtos.MinorType.MAP) {
           outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
@@ -1150,20 +1246,24 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
           outputType = inputType;
         }
 
-        // make sure to project field with children for children to show up in the schema
+        // make sure to project field with children for children to show up in
+        // the schema
         MaterializedField projected = field.withType(outputType);
         // Add the vector to our output container
         container.addOrGet(projected);
       }
     }
 
-    if (probeSchema != null) { // a probe schema was seen (even though the probe may had no rows)
+    if (probeSchema != null) { // a probe schema was seen (even though the probe
+                               // may had no rows)
       for (VectorWrapper<?> vv : probeBatch) {
         MajorType inputType = vv.getField().getType();
         MajorType outputType;
 
-        // If right or full outer join then the output type should be optional. However, map types are
-        // not nullable so we must exclude them from the check below (see DRILL-2771, DRILL-2197).
+        // If right or full outer join then the output type should be optional.
+        // However, map types are
+        // not nullable so we must exclude them from the check below (see
+        // DRILL-2771, DRILL-2197).
         if (joinIsRightOrFull && inputType.getMode() == DataMode.REQUIRED
             && inputType.getMinorType() != TypeProtos.MinorType.MAP) {
           outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
@@ -1171,7 +1271,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
           outputType = inputType;
         }
 
-        ValueVector v = container.addOrGet(MaterializedField.create(vv.getField().getName(), outputType));
+        ValueVector v = container.addOrGet(
+            MaterializedField.create(vv.getField().getName(), outputType));
         if (v instanceof AbstractContainerVector) {
           vv.getValueVector().makeTransferPair(v);
           v.clear();
@@ -1183,30 +1284,36 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
 
   // (After the inner side was read whole) - Has that inner partition spilled
   public boolean isSpilledInner(int part) {
-    if ( spilledInners == null ) { return false; } // empty inner
+    if (spilledInners == null) {
+      return false;
+    } // empty inner
     return spilledInners[part] != null;
   }
 
   /**
-   *  The constructor
+   * The constructor
    *
    * @param popConfig
    * @param context
-   * @param left  -- probe/outer side incoming input
-   * @param right -- build/iner side incoming input
+   * @param left
+   *          -- probe/outer side incoming input
+   * @param right
+   *          -- build/iner side incoming input
    * @throws OutOfMemoryException
    */
   public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
-      RecordBatch left, /*Probe side record batch*/
-      RecordBatch right /*Build side record batch*/
+      RecordBatch left, /* Probe side record batch */
+      RecordBatch right /* Build side record batch */
   ) throws OutOfMemoryException {
     super(popConfig, context, true, left, right);
     this.buildBatch = right;
     this.probeBatch = left;
     joinType = popConfig.getJoinType();
     semiJoin = popConfig.isSemiJoin();
-    joinIsLeftOrFull  = joinType == JoinRelType.LEFT  || joinType == JoinRelType.FULL;
-    joinIsRightOrFull = joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL;
+    joinIsLeftOrFull = joinType == JoinRelType.LEFT
+        || joinType == JoinRelType.FULL;
+    joinIsRightOrFull = joinType == JoinRelType.RIGHT
+        || joinType == JoinRelType.FULL;
     conditions = popConfig.getConditions();
     this.popConfig = popConfig;
     this.isRowKeyJoin = popConfig.isRowKeyJoin();
@@ -1222,63 +1329,81 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
 
     for (int i = 0; i < conditions.size(); i++) {
       SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
-      PathSegment.NameSegment nameSegment = (PathSegment.NameSegment)rightPath.getLastSegment();
+      PathSegment.NameSegment nameSegment = (PathSegment.NameSegment) rightPath
+          .getLastSegment();
       buildJoinColumns.add(nameSegment.getPath());
       String refName = "build_side_" + i;
-      rightExpr.add(new NamedExpression(conditions.get(i).getRight(), new FieldReference(refName)));
+      rightExpr.add(new NamedExpression(conditions.get(i).getRight(),
+          new FieldReference(refName)));
     }
 
     this.allocator = oContext.getAllocator();
 
-    numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR);
-    if ( numPartitions == 1 ) { //
-      disableSpilling("Spilling is disabled due to configuration setting of num_partitions to 1");
+    numPartitions = (int) context.getOptions()
+        .getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR);
+    if (numPartitions == 1) { //
+      disableSpilling(
+          "Spilling is disabled due to configuration setting of num_partitions to 1");
     }
 
-    numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2
+    numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not
+                                                                 // a power of 2
 
-    long memLimit = context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR);
+    long memLimit = context.getOptions()
+        .getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR);
 
     if (memLimit != 0) {
       allocator.setLimit(memLimit);
     }
 
-    RECORDS_PER_BATCH = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR);
-    maxBatchesInMemory = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR);
+    RECORDS_PER_BATCH = (int) context.getOptions()
+        .getOption(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR);
+    maxBatchesInMemory = (int) context.getOptions()
+        .getOption(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR);
 
-    logger.info("Memory limit {} bytes", FileUtils.byteCountToDisplaySize(allocator.getLimit()));
+    logger.info("Memory limit {} bytes",
+        FileUtils.byteCountToDisplaySize(allocator.getLimit()));
     spillSet = new SpillSet(context, popConfig);
 
-    // Create empty partitions (in the ctor - covers the case where right side is empty)
+    // Create empty partitions (in the ctor - covers the case where right side
+    // is empty)
     partitions = new HashPartition[0];
 
     // get the output batch size from config.
-    int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
-    double avail_mem_factor = context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR);
-    int outputBatchSize = Math.min(configuredBatchSize, Integer.highestOneBit((int)(allocator.getLimit() * avail_mem_factor)));
+    int configuredBatchSize = (int) context.getOptions()
+        .getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    double avail_mem_factor = context.getOptions()
+        .getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR);
+    int outputBatchSize = Math.min(configuredBatchSize,
+        Integer.highestOneBit((int) (allocator.getLimit() * avail_mem_factor)));
 
     RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
-      "configured output batch size: %d, allocated memory %d, avail mem factor %f, output batch size: %d",
-      configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize);
-
-    batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right, new HashSet<>());
+        "configured output batch size: %d, allocated memory %d, avail mem factor %f, output batch size: %d",
+        configuredBatchSize, allocator.getLimit(), avail_mem_factor,
+        outputBatchSize);
 
+    batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left,
+        right, new HashSet<>());
 
     RecordBatchStats.printConfiguredBatchSize(getRecordBatchStatsContext(),
-      configuredBatchSize);
+        configuredBatchSize);
 
-    enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER) && popConfig.getRuntimeFilterDef() != null;
+    enableRuntimeFilter = context.getOptions()
+        .getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER)
+        && popConfig.getRuntimeFilterDef() != null;
   }
 
   /**
-   * This method is called when {@link HashJoinBatch} closes. It cleans up left over spilled files that are in the spill queue, and closes the
-   * spillSet.
+   * This method is called when {@link HashJoinBatch} closes. It cleans up left
+   * over spilled files that are in the spill queue, and closes the spillSet.
    */
   private void cleanup() {
-    if ( buildSideIsEmpty.booleanValue() ) { return; } // not set up; nothing to clean
-    if ( spillSet.getWriteBytes() > 0 ) {
+    if (buildSideIsEmpty.booleanValue()) {
+      return;
+    } // not set up; nothing to clean
+    if (spillSet.getWriteBytes() > 0) {
       stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
-        (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
+          (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
     }
     // clean (and deallocate) each partition, and delete its spill file
     for (HashPartition partn : partitions) {
@@ -1290,13 +1415,17 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
       HashJoinSpilledPartition sp = spilledState.getNextSpilledPartition();
       try {
         spillSet.delete(sp.innerSpillFile);
-      } catch(IOException e) {
-        logger.warn("Cleanup: Failed to delete spill file {}",sp.innerSpillFile);
+      } catch (IOException e) {
+        logger.warn("Cleanup: Failed to delete spill file {}",
+            sp.innerSpillFile);
       }
       try { // outer file is added later; may be null if cleaning prematurely
-        if ( sp.outerSpillFile != null ) { spillSet.delete(sp.outerSpillFile); }
-      } catch(IOException e) {
-        logger.warn("Cleanup: Failed to delete spill file {}",sp.outerSpillFile);
+        if (sp.outerSpillFile != null) {
+          spillSet.delete(sp.outerSpillFile);
+        }
+      } catch (IOException e) {
+        logger.warn("Cleanup: Failed to delete spill file {}",
+            sp.outerSpillFile);
       }
     }
     // Delete the currently handled (if any) spilled files
@@ -1305,6 +1434,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
 
   /**
    * This creates a string that summarizes the memory usage of the operator.
+   *
    * @return A memory dump string.
    */
   public String makeDebugString() {
@@ -1313,28 +1443,37 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
     for (int partitionIndex = 0; partitionIndex < partitions.length; partitionIndex++) {
       String partitionPrefix = "Partition " + partitionIndex + ": ";
       HashPartition hashPartition = partitions[partitionIndex];
-      sb.append(partitionPrefix).append(hashPartition.makeDebugString()).append("\n");
+      sb.append(partitionPrefix).append(hashPartition.makeDebugString())
+          .append("\n");
     }
 
     return sb.toString();
   }
 
   /**
-   * Updates the {@link HashTable} and spilling stats after the original build side is processed.
+   * Updates the {@link HashTable} and spilling stats after the original build
+   * side is processed.
    *
-   * Note: this does not update all the stats. The cycleNum is updated dynamically in {@link #innerNext()} and the total bytes
-   * written is updated at close time in {@link #cleanup()}.
+   * Note: this does not update all the stats. The cycleNum is updated
+   * dynamically in {@link #innerNext()} and the total bytes written is updated
+   * at close time in {@link #cleanup()}.
    */
   private void updateStats() {
-    if ( buildSideIsEmpty.booleanValue() ) { return; } // no stats when the right side is empty
-    if (!spilledState.isFirstCycle()) { return; } // These stats are only for before processing spilled files
+    if (buildSideIsEmpty.booleanValue()) {
+      return;
+    } // no stats when the right side is empty
+    if (!spilledState.isFirstCycle()) {
+      return;
+    } // These stats are only for before processing spilled files
 
     HashTableStats htStats = new HashTableStats();
     long numSpilled = 0;
     HashTableStats newStats = new HashTableStats();
     // sum the stats from all the partitions
-    for ( HashPartition partn : partitions ) {
-      if ( partn.isSpilled() ) { numSpilled++; }
+    for (HashPartition partn : partitions) {
+      if (partn.isSpilled()) {
+        numSpilled++;
+      }
       partn.getStats(newStats);
       htStats.addStats(newStats);
     }
@@ -1344,74 +1483,95 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
     stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
     stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
     stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
-    stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // Put 0 in case no spill
+    stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // Put 0 in
+                                                                    // case no
+                                                                    // spill
     stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
   }
 
   /**
-   * Get the hash table iterator that is created for the build side of the hash join if
-   * this hash join was instantiated as a row-key join.
-   * @return hash table iterator or null if this hash join was not a row-key join or if it
-   * was a row-key join but the build has not yet completed.
+   * Get the hash table iterator that is created for the build side of the hash
+   * join if this hash join was instantiated as a row-key join.
+   *
+   * @return hash table iterator or null if this hash join was not a row-key
+   *         join or if it was a row-key join but the build has not yet
+   *         completed.
    */
   @Override
   public Pair<ValueVector, Integer> nextRowKeyBatch() {
     if (buildComplete) {
-      // partition 0 because Row Key Join has only a single partition - no spilling
+      // partition 0 because Row Key Join has only a single partition - no
+      // spilling
       Pair<VectorContainer, Integer> pp = partitions[0].nextBatch();
       if (pp != null) {
         VectorWrapper<?> vw = Iterables.get(pp.getLeft(), 0);
         ValueVector vv = vw.getValueVector();
         return Pair.of(vv, pp.getRight());
       }
-    } else if(partitions == null && firstOutputBatch) { //if there is data coming to right(build) side in build Schema stage, use it.
+    } else if (partitions == null && firstOutputBatch) { // if there is data
+                                                         // coming to
+                                                         // right(build) side in
+                                                         // build Schema stage,
+                                                         // use it.
       firstOutputBatch = false;
-      if ( right.getRecordCount() > 0 ) {
+      if (right.getRecordCount() > 0) {
         VectorWrapper<?> vw = Iterables.get(right, 0);
         ValueVector vv = vw.getValueVector();
-        return Pair.of(vv, right.getRecordCount()-1);
+        return Pair.of(vv, right.getRecordCount() - 1);
       }
     }
     return null;
   }
 
-  @Override    // implement RowKeyJoin interface
+  @Override // implement RowKeyJoin interface
   public boolean hasRowKeyBatch() {
     return buildComplete;
   }
 
-  @Override   // implement RowKeyJoin interface
+  @Override // implement RowKeyJoin interface
   public BatchState getBatchState() {
     return state;
   }
 
-  @Override  // implement RowKeyJoin interface
+  @Override // implement RowKeyJoin interface
   public void setBatchState(BatchState newState) {
     state = newState;
   }
 
   @Override
-  public void killIncoming(boolean sendUpstream) {
+  protected void cancelIncoming() {
     wasKilled = true;
-    probeBatch.kill(sendUpstream);
-    buildBatch.kill(sendUpstream);
+    probeBatch.cancel();
+    buildBatch.cancel();
   }
 
   public void updateMetrics() {
-    stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT, batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES, batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES, batchMemoryManager.getAvgInputRowWidth(LEFT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_RECORD_COUNT, batchMemoryManager.getTotalInputRecords(LEFT_INDEX));
-
-    stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT, batchMemoryManager.getNumIncomingBatches(RIGHT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES, batchMemoryManager.getAvgInputBatchSize(RIGHT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES, batchMemoryManager.getAvgInputRowWidth(RIGHT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_RECORD_COUNT, batchMemoryManager.getTotalInputRecords(RIGHT_INDEX));
-
-    stats.setLongStat(HashJoinBatch.Metric.OUTPUT_BATCH_COUNT, batchMemoryManager.getNumOutgoingBatches());
-    stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES, batchMemoryManager.getAvgOutputBatchSize());
-    stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES, batchMemoryManager.getAvgOutputRowWidth());
-    stats.setLongStat(HashJoinBatch.Metric.OUTPUT_RECORD_COUNT, batchMemoryManager.getTotalOutputRecords());
+    stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT,
+        batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES,
+        batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES,
+        batchMemoryManager.getAvgInputRowWidth(LEFT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_RECORD_COUNT,
+        batchMemoryManager.getTotalInputRecords(LEFT_INDEX));
+
+    stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT,
+        batchMemoryManager.getNumIncomingBatches(RIGHT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES,
+        batchMemoryManager.getAvgInputBatchSize(RIGHT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES,
+        batchMemoryManager.getAvgInputRowWidth(RIGHT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_RECORD_COUNT,
+        batchMemoryManager.getTotalInputRecords(RIGHT_INDEX));
+
+    stats.setLongStat(HashJoinBatch.Metric.OUTPUT_BATCH_COUNT,
+        batchMemoryManager.getNumOutgoingBatches());
+    stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES,
+        batchMemoryManager.getAvgOutputBatchSize());
+    stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES,
+        batchMemoryManager.getAvgOutputRowWidth());
+    stats.setLongStat(HashJoinBatch.Metric.OUTPUT_RECORD_COUNT,
+        batchMemoryManager.getTotalOutputRecords());
   }
 
   @Override
@@ -1427,41 +1587,59 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
   @Override
   public void close() {
     if (!spilledState.isFirstCycle()) { // spilling happened
-      // In case closing due to cancellation, BaseRootExec.close() does not close the open
-      // SpilledRecordBatch "scanners" as it only knows about the original left/right ops.
-      killIncoming(false);
+      // In case closing due to cancellation, BaseRootExec.close() does not
+      // close the open
+      // SpilledRecordBatch "scanners" as it only knows about the original
+      // left/right ops.
+      // TODO: Code that was here didn't actually close the "scanners"
     }
 
     updateMetrics();
 
     RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
-      "incoming aggregate left: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
-      batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
-      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+        "incoming aggregate left: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+        batchMemoryManager
+            .getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+        batchMemoryManager
+            .getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+        batchMemoryManager
+            .getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+        batchMemoryManager
+            .getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
 
     RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
-      "incoming aggregate right: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
-      batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
-      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+        "incoming aggregate right: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+        batchMemoryManager
+            .getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+        batchMemoryManager
+            .getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+        batchMemoryManager
+            .getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+        batchMemoryManager
+            .getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
 
     RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
-      "outgoing aggregate: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
-      batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
-      batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
+        "outgoing aggregate: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+        batchMemoryManager.getNumOutgoingBatches(),
+        batchMemoryManager.getAvgOutputBatchSize(),
+        batchMemoryManager.getAvgOutputRowWidth(),
+        batchMemoryManager.getTotalOutputRecords());
 
     cleanup();
     super.close();
   }
 
   public HashJoinProbe setupHashJoinProbe() {
-    //  No real code generation !!
+    // No real code generation !!
     return new HashJoinProbeTemplate();
   }
 
   @Override
   public void dump() {
-    logger.error("HashJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, hashJoinProbe={}," +
-            " rightExpr={}, canSpill={}, buildSchema={}, probeSchema={}]", container, left, right, leftUpstream, rightUpstream,
-        joinType, hashJoinProbe, rightExpr, canSpill, buildSchema, probeSchema);
+    logger.error(
+        "HashJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, hashJoinProbe={},"
+            + " rightExpr={}, canSpill={}, buildSchema={}, probeSchema={}]",
+        container, left, right, leftUpstream, rightUpstream, joinType,
+        hashJoinProbe, rightExpr, canSpill, buildSchema, probeSchema);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
index 368bff6..dea15d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
@@ -54,12 +54,14 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
     this.semiJoin = semiJoin;
   }
 
+  @Override
   public void initialize(boolean doMemoryCalculation) {
     Preconditions.checkState(!initialized);
     initialized = true;
     this.doMemoryCalculation = doMemoryCalculation;
   }
 
+  @Override
   public BuildSidePartitioning next() {
     Preconditions.checkState(initialized);
 
@@ -154,8 +156,6 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
   }
 
   /**
-   * <h1>Basic Functionality</h1>
-   * <p>
    * At this point we need to reserve memory for the following:
    * <ol>
    *   <li>An incoming batch</li>
@@ -165,7 +165,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
    * If we run out of room and need to start spilling, we need to specify which partitions
    * need to be spilled.
    * </p>
-   * <h1>Life Cycle</h1>
+   * <h4>Life Cycle</h4>
    * <p>
    *   <ul>
    *     <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean, RecordBatch, RecordBatch, Set, boolean, long, int, int, int, int, int, int, double)}.
@@ -189,9 +189,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
     private int maxBatchNumRecordsBuild;
     private int maxBatchNumRecordsProbe;
     private long memoryAvailable;
-    private boolean probeEmpty;
     private long maxBuildBatchSize;
-    private long maxProbeBatchSize;
     private long maxOutputBatchSize;
     private int initialPartitions;
     private int partitions;
@@ -307,7 +305,6 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
       this.reserveHash = reserveHash;
       this.keySizes = Preconditions.checkNotNull(keySizes);
       this.memoryAvailable = memoryAvailable;
-      this.probeEmpty = probeEmpty;
       this.buildSizePredictor = buildSizePredictor;
       this.probeSizePredictor = probeSizePredictor;
       this.initialPartitions = initialPartitions;
@@ -348,7 +345,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
     }
 
     /**
-     * This method calculates the amount of memory we need to reserve while partitioning. It also
+     * Calculates the amount of memory we need to reserve while partitioning. It also
      * calculates the size of a partition batch.
      */
     private void calculateMemoryUsage()
@@ -361,13 +358,13 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
         partitionProbeBatchSize = probeSizePredictor.predictBatchSize(recordsPerPartitionBatchProbe, reserveHash);
       }
 
-      maxOutputBatchSize = (long) ((double)outputBatchSize * fragmentationFactor * safetyFactor);
+      maxOutputBatchSize = (long) (outputBatchSize * fragmentationFactor * safetyFactor);
 
       long probeReservedMemory = 0;
 
       for (partitions = initialPartitions;; partitions /= 2) {
         // The total amount of memory to reserve for incomplete batches across all partitions
-        long incompletePartitionsBatchSizes = ((long) partitions) * partitionBuildBatchSize;
+        long incompletePartitionsBatchSizes = (partitions) * partitionBuildBatchSize;
         // We need to reserve all the space for incomplete batches, and the incoming batch as well as the
         // probe batch we sniffed.
         reservedMemory = incompletePartitionsBatchSizes + maxBuildBatchSize;
@@ -448,7 +445,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
 
       if (reserveHash) {
         // Include the hash sizes for the batch
-        consumedMemory += ((long) IntVector.VALUE_WIDTH) * partitionStatsSet.getNumInMemoryRecords();
+        consumedMemory += (IntVector.VALUE_WIDTH) * partitionStatsSet.getNumInMemoryRecords();
       }
 
       consumedMemory += RecordBatchSizer.multiplyByFactor(partitionStatsSet.getConsumedMemory(), fragmentationFactor);
@@ -541,14 +538,13 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
   }
 
   /**
-   * <h1>Basic Functionality</h1>
    * <p>
    *   In this state, we need to make sure there is enough room to spill probe side batches, if
    *   spilling is necessary. If there is not enough room, we have to evict build side partitions.
    *   If we don't have to evict build side partitions in this state, then we are done. If we do have
    *   to evict build side partitions then we have to recursively repeat the process.
    * </p>
-   * <h1>Lifecycle</h1>
+   * <h4>Lifecycle</h4>
    * <p>
    *   <ul>
    *     <li><b>Step 1:</b> Call {@link #initialize(boolean)}. This
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index d40f6a3..79956a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -248,7 +248,6 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
         switch (leftUpstream) {
           case NONE:
           case NOT_YET:
-          case STOP:
             recordsProcessed = 0;
             recordsToProcess = 0;
             changeToFinalProbeState();
@@ -286,6 +285,8 @@ public class HashJoinProbeTemplate implements HashJoinProbe {
             if (cycleNum > 0) {
               read_left_HV_vector = (IntVector) probeBatch.getContainer().getLast(); // Needed ?
             }
+            break;
+          default:
         }
       }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java
index 4036438..14f9305 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinStateCalculator.java
@@ -23,7 +23,7 @@ import javax.annotation.Nullable;
  * A {@link HashJoinStateCalculator} is a piece of code that compute the memory requirements for one of the states
  * in the {@link HashJoinState} enum.
  */
-public interface HashJoinStateCalculator<T extends HashJoinStateCalculator> {
+public interface HashJoinStateCalculator<T extends HashJoinStateCalculator<?>> {
   /**
    * Signifies that the current state is complete and returns the next {@link HashJoinStateCalculator}.
    * Returns null in the case where there is no next state.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index 503ff02..08b7c09 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -27,7 +27,6 @@ import org.apache.calcite.rel.core.JoinRelType;
  * Maintain join state.
  */
 public final class JoinStatus {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinStatus.class);
 
   public final RecordIterator left;
   public final RecordIterator right;
@@ -142,10 +141,6 @@ public final class JoinStatus {
    *  4. JoinOutcome.SCHEMA_CHANGED : one of the side has change in schema.
    */
   public JoinOutcome getOutcome() {
-    // on STOP, OUT_OF_MEMORY return FAILURE.
-    if (!ok || eitherMatches(IterOutcome.STOP)) {
-      return JoinOutcome.FAILURE;
-    }
     if (hasMoreData) {
       return JoinOutcome.BATCH_RETURNED;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index c2a2b56..7e453c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -56,7 +56,6 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
 
 /**
  * RecordBatch implementation for the lateral join operator. Currently it's
@@ -352,7 +351,6 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
 
     // EMIT outcome is not expected as part of first batch from either side
     if (leftUpstream == EMIT || rightUpstream == EMIT) {
-      state = BatchState.STOP;
       throw new IllegalStateException("Unexpected IterOutcome.EMIT received either from left or right side in " +
         "buildSchema phase");
     }
@@ -392,14 +390,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
-    this.left.kill(sendUpstream);
-    // Reset the left side outcome as STOP since as part of right kill when UNNEST will ask IterOutcome of left incoming
-    // from LATERAL and based on that it can make decision if the kill is coming from downstream to LATERAL or upstream
-    // to LATERAL. Like LIMIT operator being present downstream to LATERAL or upstream to LATERAL and downstream to
-    // UNNEST.
-    leftUpstream = STOP;
-    this.right.kill(sendUpstream);
+  protected void cancelIncoming() {
+    left.cancel();
+    right.cancel();
   }
 
   /* ****************************************************************************************************************
@@ -417,7 +410,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
   }
 
   private boolean isTerminalOutcome(IterOutcome outcome) {
-    return (outcome == STOP || outcome == NONE);
+    return outcome == NONE;
   }
 
   /**
@@ -483,7 +476,6 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
           }
           break;
         case NONE:
-        case STOP:
           // Not using =0 since if outgoing container is empty then no point returning anything
           if (outputIndex > 0) { // can only reach here from produceOutputBatch
             processLeftBatchInFuture = true;
@@ -554,7 +546,6 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
           needNewRightBatch = false;
           break;
         case NONE:
-        case STOP:
           needNewRightBatch = false;
           break;
         case NOT_YET:
@@ -934,10 +925,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
 
   private boolean setBatchState(IterOutcome outcome) {
     switch(outcome) {
-      case STOP:
       case EMIT:
-        state = BatchState.STOP;
-        return false;
       case NONE:
       case NOT_YET:
         state = BatchState.DONE;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 6eeacd8..f4ec939 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -238,8 +238,10 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
         case FAILURE:
           status.left.clearInflightBatches();
           status.right.clearInflightBatches();
-          kill(false);
-          return IterOutcome.STOP;
+          // Should handle at the source of the error to provide a better error message.
+          throw UserException.executionError(null)
+              .message("Merge failed")
+              .build(logger);
         case NO_MORE_DATA:
           logger.debug("NO MORE DATA; returning {}",
             (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : (first ? "OK_NEW_SCHEMA" : "NONE")));
@@ -299,12 +301,6 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
     rightIterator.close();
   }
 
-  @Override
-  protected void killIncoming(boolean sendUpstream) {
-    left.kill(sendUpstream);
-    right.kill(sendUpstream);
-  }
-
   private JoinWorker generateNewWorker() {
     final ClassGenerator<JoinWorker> cg = CodeGenerator.getRoot(JoinWorker.TEMPLATE_DEFINITION, context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 38f7aa0..a55ecd3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -177,8 +177,6 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
             addBatchToHyperContainer(right);
             break;
           case NONE:
-          case STOP:
-            //TODO we got a STOP, shouldn't we stop immediately ?
           case NOT_YET:
             drainRight = false;
             break;
@@ -211,7 +209,7 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
     if (!hasMore(rightUpstream)) {
       return;
     }
-    right.kill(true);
+    right.cancel();
     while (hasMore(rightUpstream)) {
       VectorAccessibleUtilities.clear(right);
       rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right);
@@ -446,9 +444,9 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
-    this.left.kill(sendUpstream);
-    this.right.kill(sendUpstream);
+  protected void cancelIncoming() {
+    left.cancel();
+    right.cancel();
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
index bc8c53b..3f48264 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
@@ -87,11 +87,6 @@ public class RowKeyJoinBatch extends AbstractRecordBatch<RowKeyJoinPOP> implemen
 
     rightUpstream = next(right);
 
-    if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
-      state = BatchState.STOP;
-      return;
-    }
-
     if (right.getRecordCount() > 0) {
       // set the hasRowKeyBatch flag such that calling next() on the left input
       // would see the correct status
@@ -138,7 +133,6 @@ public class RowKeyJoinBatch extends AbstractRecordBatch<RowKeyJoinPOP> implemen
 
       switch(rightUpstream) {
       case NONE:
-      case STOP:
         rkJoinState = RowKeyJoinState.DONE;
         state = BatchState.DONE;
         return rightUpstream;
@@ -269,9 +263,9 @@ public class RowKeyJoinBatch extends AbstractRecordBatch<RowKeyJoinPOP> implemen
   }
 
   @Override
-  public void killIncoming(boolean sendUpstream) {
-    left.kill(sendUpstream);
-    right.kill(sendUpstream);
+  protected void cancelIncoming() {
+    left.cancel();
+    right.cancel();
   }
 
   @Override
@@ -285,5 +279,4 @@ public class RowKeyJoinBatch extends AbstractRecordBatch<RowKeyJoinPOP> implemen
     logger.error("RowKeyJoinBatch[container={}, left={}, right={}, hasRowKeyBatch={}, rkJoinState={}]",
         container, left, right, hasRowKeyBatch, rkJoinState);
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index bb5b38d..f698096 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -27,6 +27,7 @@ import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 
@@ -60,15 +61,13 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   public IterOutcome innerNext() {
     if (!first && !needMoreRecords(numberOfRecords)) {
       outgoingSv.setRecordCount(0);
-      incoming.kill(true);
-
+      incoming.cancel();
       IterOutcome upStream = next(incoming);
 
       while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
         // Clear the memory for the incoming batch
-        for (VectorWrapper<?> wrapper : incoming) {
-          wrapper.getValueVector().clear();
-        }
+        VectorAccessibleUtilities.clear(incoming);
+
         // clear memory for incoming sv (if any)
         if (incomingSv != null) {
           incomingSv.clear();
@@ -78,9 +77,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
       // If EMIT that means leaf operator is UNNEST, in this case refresh the limit states and return EMIT.
       if (upStream == EMIT) {
         // Clear the memory for the incoming batch
-        for (VectorWrapper<?> wrapper : incoming) {
-          wrapper.getValueVector().clear();
-        }
+        VectorAccessibleUtilities.clear(incoming);
 
         // clear memory for incoming sv (if any)
         if (incomingSv != null) {
@@ -139,15 +136,16 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
       container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE);
       return true;
     } else {
-     return false;
+      return false;
     }
   }
 
   /**
-   * Gets the outcome to return from super implementation and then in case of EMIT outcome it refreshes the state of
-   * operator. Refresh is done to again apply limit on all the future incoming batches which will be part of next
+   * Gets the outcome to return from super implementation and then in case of
+   * EMIT outcome it refreshes the state of operator. Refresh is done to again
+   * apply limit on all the future incoming batches which will be part of next
    * record boundary.
-   * @param hasRemainder
+   *
    * @return - IterOutcome to send downstream
    */
   @Override
@@ -201,10 +199,13 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   }
 
   /**
-   * limit call when incoming batch has number of records more than the start offset such that it can produce some
-   * output records. After first call of this method recordStartOffset should be 0 since we have already skipped the
+   * limit call when incoming batch has number of records more than the start
+   * offset such that it can produce some output records. After first call of
+   * this method recordStartOffset should be 0 since we have already skipped the
    * required number of records as part of first incoming record batch.
-   * @param inputRecordCount - number of records in incoming batch
+   *
+   * @param inputRecordCount
+   *          number of records in incoming batch
    */
   private void limit(int inputRecordCount) {
     int endRecordIndex;
@@ -240,8 +241,10 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   }
 
   /**
-   * Method which returns if more output records are needed from LIMIT operator. When numberOfRecords is set to
-   * {@link Integer#MIN_VALUE} that means there is no end bound on LIMIT, so get all the records past start offset.
+   * Method which returns if more output records are needed from LIMIT operator.
+   * When numberOfRecords is set to {@link Integer#MIN_VALUE} that means there
+   * is no end bound on LIMIT, so get all the records past start offset.
+   *
    * @return - true - more output records is expected.
    *           false - limit bound is reached and no more record is expected
    */
@@ -261,8 +264,9 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   }
 
   /**
-   * Reset the states for recordStartOffset and numberOfRecords based on the popConfig passed to the operator.
-   * This method is called for the outcome EMIT no matter if limit is reached or not.
+   * Reset the states for recordStartOffset and numberOfRecords based on the
+   * popConfig passed to the operator. This method is called for the outcome
+   * EMIT no matter if limit is reached or not.
    */
   private void refreshLimitState() {
     // Make sure startOffset is non-negative
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index ecac4ef..d25e4bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -531,12 +531,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   }
 
   @Override
-  public void kill(final boolean sendUpstream) {
-    if (sendUpstream) {
-      informSenders();
-    } else {
-      close();
-    }
+  public void cancel() {
+    informSenders();
 
     for (final RawFragmentBatchProvider provider : fragProviders) {
       provider.kill(context);
@@ -587,9 +583,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   }
 
   @Override
-  protected void killIncoming(final boolean sendUpstream) {
-    //No op
-  }
+  protected void cancelIncoming() { }
 
   private void checkSameSchemaAmongBatches(final RecordBatchLoader[] batchLoaders) {
     Preconditions.checkArgument(batchLoaders.length > 0, "0 batch is not allowed!");
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
index 49dc42e..7314961 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
@@ -163,7 +163,6 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
           finishedLeft = !firstLeft;
           break outer;
         case NOT_YET:
-        case STOP:
           return outcome;
         case OK_NEW_SCHEMA:
           if (firstLeft) {
@@ -209,7 +208,6 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
           finishedRight = true;
           break outer;
         case NOT_YET:
-        case STOP:
           return outcome;
         case OK_NEW_SCHEMA:
           firstRight = false;
@@ -736,12 +734,6 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
-    left.kill(sendUpstream);
-    right.kill(sendUpstream);
-  }
-
-  @Override
   public void dump() {
     logger.error("MetadataHandlerBatch[container={}, popConfig={}]", container, popConfig);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
index 600a170..6c7d4df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
@@ -69,7 +69,6 @@ import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
 
 /**
  * Responsible for handling metadata returned by incoming aggregate operators
@@ -125,7 +124,6 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
         doWorkInternal();
         // fall thru
       case NOT_YET:
-      case STOP:
         return outcome;
       default:
         throw new UnsupportedOperationException("Unsupported upstream state " + outcome);
@@ -135,7 +133,7 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
   @Override
   public IterOutcome innerNext() {
     IterOutcome outcome = getLastKnownOutcome();
-    if (outcome != NONE && outcome != STOP) {
+    if (outcome != NONE) {
       outcome = super.innerNext();
     }
     // if incoming is exhausted, reads metadata which should be obtained from the Metastore
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
index 25af95a..e9e7bf4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
@@ -32,18 +32,16 @@ import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.vector.IntVector;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class OrderedPartitionProjectorTemplate implements OrderedPartitionProjector {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionProjectorTemplate.class);
+  static final Logger logger = LoggerFactory.getLogger(OrderedPartitionProjectorTemplate.class);
 
   private ImmutableList<TransferPair> transfers;
-//  private VectorContainer partitionVectors;
   private int partitions;
-//  private SelectionVector2 vector2;
-//  private SelectionVector4 vector4;
   private SelectionVectorMode svMode;
   private RecordBatch outBatch;
-//  private SchemaPath outputField;
   private IntVector partitionValues;
 
   public OrderedPartitionProjectorTemplate() throws SchemaChangeException{
@@ -83,12 +81,12 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit
 
     this.svMode = incoming.getSchema().getSelectionVectorMode();
     this.outBatch = outgoing;
-//    this.outputField = outputField;
     partitionValues = (IntVector) outBatch.getValueAccessorById(IntVector.class, outBatch.getValueVectorId(outputField).getFieldIds()).getValueVector();
     switch(svMode){
     case FOUR_BYTE:
     case TWO_BYTE:
       throw new UnsupportedOperationException("Selection vector not supported");
+    default:
     }
     this.transfers = ImmutableList.copyOf(transfers);
     this.partitions = partitions;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index f52554c..37b8826 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -135,6 +135,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
   private final String mapKey;
   private List<VectorContainer> sampledIncomingBatches;
 
+  @SuppressWarnings("null")
   public OrderedPartitionRecordBatch(OrderedPartitionSender pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
     super(pop, context);
     this.incoming = incoming;
@@ -186,7 +187,6 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
         switch (upstream) {
         case NONE:
         case NOT_YET:
-        case STOP:
           upstreamNone = true;
           break outer;
         default:
@@ -441,8 +441,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
-    incoming.kill(sendUpstream);
+  protected void cancelIncoming() {
+    incoming.cancel();
   }
 
   @Override
@@ -504,7 +504,6 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
     switch (upstream) {
     case NONE:
     case NOT_YET:
-    case STOP:
       close();
       recordCount = 0;
       return upstream;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java
index 4ce10c8..36baad3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java
@@ -23,12 +23,14 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class SampleCopierTemplate implements SampleCopier {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SampleCopierTemplate.class);
+  static final Logger logger = LoggerFactory.getLogger(SampleCopierTemplate.class);
 
   private SelectionVector4 sv4;
-  private int outputRecords = 0;
+  private int outputRecords;
 
   @Override
   public void setupCopier(FragmentContext context, SelectionVector4 sv4, VectorAccessible incoming, VectorAccessible outgoing)
@@ -42,10 +44,8 @@ public abstract class SampleCopierTemplate implements SampleCopier {
     return outputRecords;
   }
 
-
   @Override
   public boolean copyRecords(int skip, int start, int total) {
-    final int recordCount = sv4.getCount();
     int outgoingPosition = 0;
     int increment = skip > 0 ? skip : 1;
     for(int svIndex = start; svIndex < sv4.getCount() && outputRecords < total; svIndex += increment, outgoingPosition++){
@@ -60,7 +60,4 @@ public abstract class SampleCopierTemplate implements SampleCopier {
 
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
   public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
-
-
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 3b7f78a..d2eea15 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -74,7 +75,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
 
   private final AtomicIntegerArray remainingReceivers;
   private final AtomicInteger remaingReceiverCount;
-  private boolean done = false;
+  private boolean done;
   private boolean first = true;
   private final boolean closeIncoming;
 
@@ -153,7 +154,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
     if (!done) {
       out = next(incoming);
     } else {
-      incoming.kill(true);
+      incoming.cancel();
       out = IterOutcome.NONE;
     }
 
@@ -171,15 +172,9 @@ public class PartitionSenderRootExec extends BaseRootExec {
             sendEmptyBatch(true);
           }
         } catch (ExecutionException e) {
-          incoming.kill(false);
-          logger.error("Error while creating partitioning sender or flushing outgoing batches", e);
-          context.getExecutorState().fail(e.getCause());
-        }
-        return false;
-
-      case STOP:
-        if (partitioner != null) {
-          partitioner.clear();
+          throw UserException.dataWriteError(e)
+            .addContext("Error while creating partitioning sender or flushing outgoing batches")
+            .build(logger);
         }
         return false;
 
@@ -198,18 +193,17 @@ public class PartitionSenderRootExec extends BaseRootExec {
             sendEmptyBatch(false);
           }
         } catch (ExecutionException e) {
-          incoming.kill(false);
-          logger.error("Error while flushing outgoing batches", e);
-          context.getExecutorState().fail(e.getCause());
-          return false;
+          throw UserException.dataWriteError(e)
+            .addContext("Error while flushing outgoing batches")
+            .build(logger);
         }
       case OK:
         try {
           partitioner.partitionBatch(incoming);
         } catch (ExecutionException e) {
-          context.getExecutorState().fail(e.getCause());
-          incoming.kill(false);
-          return false;
+          throw UserException.dataWriteError(e)
+            .addContext("Error while partitioning outgoing batches")
+            .build(logger);
         }
         VectorAccessibleUtilities.clear(incoming);
         return true;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index 8c8cc54..5e1ffd4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -50,7 +50,6 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
   private final BlockingDeque<RecordBatchDataWrapper> queue;
   private int recordCount;
   private BatchSchema schema;
-  private boolean stop;
   private final CountDownLatch cleanUpLatch = new CountDownLatch(1); // used to wait producer to clean up
 
   protected ProducerConsumerBatch(final ProducerConsumer popConfig, final FragmentContext context, final RecordBatch incoming) throws OutOfMemoryException {
@@ -77,8 +76,6 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
     }
     if (wrapper.finished) {
       return IterOutcome.NONE;
-    } else if (wrapper.failed) {
-      return IterOutcome.STOP;
     }
 
     recordCount = wrapper.batch.getRecordCount();
@@ -119,10 +116,8 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
 
     @Override
     public void run() {
+      boolean stop = false;
       try {
-        if (stop) {
-          return;
-        }
         outer:
         while (true) {
           final IterOutcome upstream = incoming.next();
@@ -130,9 +125,6 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
             case NONE:
               stop = true;
               break outer;
-            case STOP:
-              queue.putFirst(RecordBatchDataWrapper.failed());
-              return;
             case OK_NEW_SCHEMA:
             case OK:
               wrapper = RecordBatchDataWrapper.batch(new RecordBatchData(incoming, oContext.getAllocator()));
@@ -174,9 +166,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
   }
 
   @Override
-  protected void killIncoming(final boolean sendUpstream) {
-    stop = true;
-  }
+  protected void cancelIncoming() { }
 
   @Override
   public void close() {
@@ -207,30 +197,24 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
   private static class RecordBatchDataWrapper {
     final RecordBatchData batch;
     final boolean finished;
-    final boolean failed;
 
-    RecordBatchDataWrapper(final RecordBatchData batch, final boolean finished, final boolean failed) {
+    RecordBatchDataWrapper(final RecordBatchData batch, final boolean finished) {
       this.batch = batch;
       this.finished = finished;
-      this.failed = failed;
     }
 
     public static RecordBatchDataWrapper batch(final RecordBatchData batch) {
-      return new RecordBatchDataWrapper(batch, false, false);
+      return new RecordBatchDataWrapper(batch, false);
     }
 
     public static RecordBatchDataWrapper finished() {
-      return new RecordBatchDataWrapper(null, true, false);
-    }
-
-    public static RecordBatchDataWrapper failed() {
-      return new RecordBatchDataWrapper(null, false, true);
+      return new RecordBatchDataWrapper(null, true);
     }
   }
 
   @Override
   public void dump() {
-    logger.error("ProducerConsumerBatch[container={}, recordCount={}, schema={}, stop={}]",
-        container, recordCount, schema, stop);
+    logger.error("ProducerConsumerBatch[container={}, recordCount={}, schema={}]",
+        container, recordCount, schema);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 5b2bdc5..a4613a1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -68,8 +68,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
-    super.killIncoming(sendUpstream);
+  protected void cancelIncoming() {
+    super.cancelIncoming();
     hasRemainder = false;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
index b3ca591..9a96909 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
@@ -138,7 +138,7 @@ public class OperatorRecordBatch implements CloseableRecordBatch {
   }
 
   @Override
-  public void kill(boolean sendUpstream) {
+  public void cancel() {
     driver.cancel();
   }
 
@@ -164,11 +164,6 @@ public class OperatorRecordBatch implements CloseableRecordBatch {
   }
 
   @Override
-  public boolean hasFailed() {
-    return lastOutcome == IterOutcome.STOP;
-  }
-
-  @Override
   public void dump() {
     logger.error("OperatorRecordBatch[batchAccessor={}, lastOutcome={}]", batchAccessor, lastOutcome);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
index 56c6246..5e859f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
@@ -69,11 +69,6 @@ public class RangePartitionRecordBatch extends AbstractSingleRecordBatch<RangePa
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
-    incoming.kill(sendUpstream);
-  }
-
-  @Override
   public int getRecordCount() {
     return recordCount;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
index 4b2cfc8..6cc723c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
@@ -35,7 +35,6 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
  * Holds the data for a particular record batch for later manipulation.
  */
 public class RecordBatchData {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchData.class);
 
   private SelectionVector2 sv2;
   private int recordCount;
@@ -72,7 +71,7 @@ public class RecordBatchData {
 
   public List<ValueVector> getVectors() {
     List<ValueVector> vectors = Lists.newArrayList();
-    for (VectorWrapper w : container) {
+    for (VectorWrapper<?> w : container) {
       vectors.add(w.getValueVector());
     }
     return vectors;
@@ -98,5 +97,4 @@ public class RecordBatchData {
     }
     container.clear();
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 98f6327..747959f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -107,8 +107,6 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
         break outer;
       case NOT_YET:
         throw new UnsupportedOperationException();
-      case STOP:
-        return upstream;
       case OK_NEW_SCHEMA:
         // only change in the case that the schema truly changes.  Artificial schema changes are ignored.
         if (!incoming.getSchema().equals(schema)) {
@@ -205,8 +203,8 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
-    incoming.kill(sendUpstream);
+  protected void cancelIncoming() {
+    incoming.cancel();
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
index 7a88dc9..967209b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
@@ -342,7 +342,6 @@ public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMe
         case NONE:
           break outer;
         case NOT_YET:
-        case STOP:
           return outcome;
         case OK_NEW_SCHEMA:
           if (first) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 63bcb86..2039e79 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -84,12 +84,6 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
-    left.kill(sendUpstream);
-    right.kill(sendUpstream);
-  }
-
-  @Override
   protected void buildSchema() {
     if (! prefetchFirstBatchFromBothSides()) {
       state = BatchState.DONE;
@@ -377,9 +371,6 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
               batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex),
               getRecordBatchStatsContext());
             return Pair.of(outcome, topStatus);
-          case STOP:
-            batchStatusStack.pop();
-            return Pair.of(outcome, topStatus);
           case NONE:
             batchStatusStack.pop();
             if (batchStatusStack.isEmpty()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 65d66ce..5e6ef47 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -145,7 +145,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
+  protected void cancelIncoming() {
     //
     // In some cases we need to return a predetermined state from a call to next. These are:
     // 1) Kill is called due to an error occurring in the processing of the query. IterOutcome should be NONE
@@ -154,11 +154,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
     // special handling is needed in that case.
     //
     Preconditions.checkNotNull(lateral);
-    // Do not call kill on incoming. Lateral Join has the responsibility for killing incoming
-    Preconditions.checkState(context.getExecutorState().isFailed() ||
-      lateral.getLeftOutcome() == IterOutcome.STOP, "Kill received by unnest with unexpected state. " +
-      "Neither the LateralOutcome is STOP nor executor state is failed");
-      logger.debug("Kill received. Stopping all processing");
+    logger.debug("Cancel received. Stopping all processing");
     state = BatchState.DONE;
     hasRemainder = false; // whatever the case, we need to stop processing the current row.
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index b94c551..bfbad64 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -118,10 +118,8 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
   }
 
   @Override
-  public void kill(boolean sendUpstream) {
-    if (sendUpstream) {
-      informSenders();
-    }
+  public void cancel() {
+    informSenders();
     fragProvider.kill(context);
   }
 
@@ -293,9 +291,4 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
   public void dump() {
     logger.error("UnorderedReceiverBatch[batchLoader={}, schema={}]", batchLoader, schema);
   }
-
-  @Override
-  public boolean hasFailed() {
-    return lastOutcome == IterOutcome.STOP;
-  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
index 66e8b01..1fb44f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
@@ -112,7 +112,6 @@ public class UnpivotMapsRecordBatch extends AbstractSingleRecordBatch<UnpivotMap
     switch (upStream) {
       case NONE:
       case NOT_YET:
-      case STOP:
         return upStream;
       case OK_NEW_SCHEMA:
         first = false;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 7e795dd..e857364 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.physical.impl.validate;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
 
 import java.util.Iterator;
 
@@ -120,7 +119,6 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
     this(incoming, false);
   }
 
-
   public void enableBatchValidation(boolean option) {
     validateBatches = option;
   }
@@ -184,8 +182,8 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
   }
 
   @Override
-  public void kill(boolean sendUpstream) {
-    incoming.kill(sendUpstream);
+  public void cancel() {
+    incoming.cancel();
   }
 
   @Override
@@ -227,7 +225,7 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
                 instNum, batchTypeName, exceptionState, batchState));
       }
       // (Note:  This could use validationState.)
-      if ((!isRepeatable && batchState == NONE) || batchState == STOP) {
+      if (!isRepeatable && batchState == NONE) {
         throw new IllegalStateException(
             String.format(
                 "next() [on #%d, %s] called again after it returned %s."
@@ -271,12 +269,6 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
             validationState = ValidationState.TERMINAL;
           }
           break;
-        case STOP:
-          // STOP is allowed at any time, except if already terminated (checked
-          // above).
-          // STOP moves to terminal high-level state.
-          validationState = ValidationState.TERMINAL;
-          break;
         case NOT_YET:
           // NOT_YET is allowed at any time, except if
           // terminated (checked above).
@@ -286,7 +278,6 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
           throw new AssertionError(
               "Unhandled new " + IterOutcome.class.getSimpleName() + " value "
               + batchState);
-          //break;
       }
 
       // Validate schema when available.
@@ -385,11 +376,6 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
   public RecordBatch getIncoming() { return incoming; }
 
   @Override
-  public boolean hasFailed() {
-    return exceptionState != null || batchState == STOP;
-  }
-
-  @Override
   public void dump() {
     logger.error("IteratorValidatorBatchIterator[container={}, instNum={}, batchTypeName={}, lastSchema={}, "
            + "lastNewSchema={}]", getContainer(), instNum, batchTypeName, lastSchema, lastNewSchema);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index ba8c06b..5873779 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -66,8 +66,6 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
   private boolean noMoreBatches; // true when downstream returns NONE
   private BatchSchema schema;
 
-  private boolean shouldStop; // true if we received an early termination request
-
   public WindowFrameRecordBatch(WindowPOP popConfig, FragmentContext context,
       RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context);
@@ -88,21 +86,6 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
       return IterOutcome.NONE;
     }
 
-    if (shouldStop) {
-      if (!noMoreBatches) {
-        IterOutcome upstream = next(incoming);
-        while (upstream == IterOutcome.OK || upstream == IterOutcome.OK_NEW_SCHEMA) {
-          // Clear the memory for the incoming batch
-          for (VectorWrapper<?> wrapper : incoming) {
-            wrapper.getValueVector().clear();
-          }
-          upstream = next(incoming);
-        }
-      }
-
-      return IterOutcome.NONE;
-    }
-
     // keep saving incoming batches until the first unprocessed batch can be
     // processed, or upstream == NONE
     while (!noMoreBatches && !canDoWork()) {
@@ -114,7 +97,6 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
           noMoreBatches = true;
           break;
         case NOT_YET:
-        case STOP:
           cleanup();
           return upstream;
         case OK_NEW_SCHEMA:
@@ -228,11 +210,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
       state = BatchState.DONE;
       container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
       return;
-    case STOP:
-      state = BatchState.STOP;
-      return;
     default:
-      break;
     }
 
     try {
@@ -405,9 +383,8 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
-    shouldStop = true;
-    incoming.kill(sendUpstream);
+  protected void cancelIncoming() {
+    incoming.cancel();
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 17a634d..42ea36c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -21,7 +21,6 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
 
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.UserException;
@@ -333,9 +332,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       container.buildSchema(SelectionVectorMode.NONE);
       container.setRecordCount(0);
       break;
-    case STOP:
-      state = BatchState.STOP;
-      break;
     case NONE:
       state = BatchState.DONE;
       break;
@@ -408,10 +404,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         // all batches have been read at this record boundary
         break loop;
 
-      case STOP:
-        // Something went wrong.
-        return STOP;
-
       default:
         break;
       }
@@ -450,7 +442,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     }
     switch (lastKnownOutcome) {
     case NONE:
-    case STOP:
       return lastKnownOutcome;
 
     case OK_NEW_SCHEMA:
@@ -499,8 +490,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   }
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
-    incoming.kill(sendUpstream);
+  protected void cancelIncoming() {
+    incoming.cancel();
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MergeSortWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MergeSortWrapper.java
index 01c0f0b..104e3fe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MergeSortWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MergeSortWrapper.java
@@ -266,7 +266,7 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
     } else {
       int index = 0;
       for (VectorWrapper<?> w : inputDataContainer) {
-        HyperVectorWrapper wrapper = (HyperVectorWrapper<?>) container.getValueVector(index++);
+        HyperVectorWrapper<?> wrapper = (HyperVectorWrapper<?>) container.getValueVector(index++);
         wrapper.updateVectorList(w.getValueVectors());
       }
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
index b2b4d4c..2e5881b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
@@ -631,6 +631,7 @@ public abstract class TupleState extends ContainerState
       super(events, vectorCache, projectionSet);
     }
 
+    @Override
     public void bindColumnState(ColumnState colState) {
       super.bindColumnState(colState);
       writer().bindListener(this);
@@ -701,6 +702,7 @@ public abstract class TupleState extends ContainerState
       this.offsets = offsets;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public T vector() {
       return vector;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index ee10a15..de8f499 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -31,15 +31,17 @@ import org.apache.drill.exec.server.options.TypeValidators.DoubleValidator;
 import org.apache.drill.exec.server.options.TypeValidators.PositiveLongValidator;
 import org.apache.drill.exec.server.options.TypeValidators.RangeDoubleValidator;
 import org.apache.drill.exec.server.options.TypeValidators.RangeLongValidator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.drill.exec.server.options.TypeValidators.MinRangeDoubleValidator;
 import org.apache.drill.exec.server.options.TypeValidators.MaxRangeDoubleValidator;
 import org.apache.calcite.plan.Context;
 
 public class PlannerSettings implements Context{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlannerSettings.class);
+  private static final Logger logger = LoggerFactory.getLogger(PlannerSettings.class);
 
-  private int numEndPoints = 0;
-  private boolean useDefaultCosting = false; // True: use default Optiq costing, False: use Drill costing
+  private int numEndPoints;
+  private boolean useDefaultCosting; // True: use default Optiq costing, False: use Drill costing
   private boolean forceSingleMode;
 
   public static final int MAX_BROADCAST_THRESHOLD = Integer.MAX_VALUE;
@@ -48,21 +50,26 @@ public class PlannerSettings implements Context{
   // initial off heap memory allocation (1M)
   private static final long INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES = 1024 * 1024;
   // default off heap memory for planning (256M)
+  @SuppressWarnings("unused")
   private static final long DEFAULT_MAX_OFF_HEAP_ALLOCATION_IN_BYTES = 256 * 1024 * 1024;
   // max off heap memory for planning (16G)
   private static final long MAX_OFF_HEAP_ALLOCATION_IN_BYTES = 16l * 1024 * 1024 * 1024;
 
   public static final OptionValidator CONSTANT_FOLDING = new BooleanValidator("planner.enable_constant_folding",
       new OptionDescription("If one side of a filter condition is a constant expression, constant folding evaluates the expression in the planning phase and replaces the expression with the constant value. For example, Drill can rewrite WHERE age + 5 < 42 as WHERE age < 37."));
-  public static final OptionValidator EXCHANGE = new BooleanValidator("planner.disable_exchanges",
+  public static final String DISABLE_EXCHANGE_OPTION = "planner.disable_exchanges";
+  public static final OptionValidator EXCHANGE = new BooleanValidator(DISABLE_EXCHANGE_OPTION,
       new OptionDescription("Toggles the state of hashing to a random exchange."));
-  public static final OptionValidator HASHAGG = new BooleanValidator("planner.enable_hashagg",
+  public static final String ENABLE_HASH_AGG_OPTION = "planner.enable_hashagg";
+  public static final OptionValidator HASHAGG = new BooleanValidator(ENABLE_HASH_AGG_OPTION,
       new OptionDescription("Enable hash aggregation; otherwise, Drill does a sort-based aggregation. Writes to disk. Enable is recommended."));
-  public static final OptionValidator STREAMAGG = new BooleanValidator("planner.enable_streamagg",
+  public static final String ENABLE_STREAM_AGG_OPTION = "planner.enable_streamagg";
+  public static final OptionValidator STREAMAGG = new BooleanValidator(ENABLE_STREAM_AGG_OPTION,
       new OptionDescription("Sort-based operation. Writes to disk."));
   public static final OptionValidator TOPN = new BooleanValidator("planner.enable_topn",
       new OptionDescription("Generates the topN plan for queries with the ORDER BY and LIMIT clauses."));
-  public static final OptionValidator HASHJOIN = new BooleanValidator("planner.enable_hashjoin",
+  public static final String ENABLE_HASH_JOIN_OPTION = "planner.enable_hashjoin";
+  public static final OptionValidator HASHJOIN = new BooleanValidator(ENABLE_HASH_JOIN_OPTION,
       new OptionDescription("Enable the memory hungry hash join. Drill assumes that a query will have adequate memory to complete and tries to use the fastest operations possible to complete the planned inner, left, right, or full outer joins using a hash table. Does not write to disk. Disabling hash join allows Drill to manage arbitrarily large data in a small memory footprint."));
   public static final OptionValidator SEMIJOIN = new BooleanValidator("planner.enable_semijoin",
           new OptionDescription("Enable the semi join optimization. Planner removes the distinct processing below the hash join and sets the semi join flag in hash join."));
@@ -481,6 +488,7 @@ public class PlannerSettings implements Context{
     return options.getOption(STATISTICS_MULTICOL_NDV_ADJUST_FACTOR);
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public <T> T unwrap(Class<T> clazz) {
     if(clazz == PlannerSettings.class){
@@ -489,6 +497,4 @@ public class PlannerSettings implements Context{
       return null;
     }
   }
-
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
index 49cbb46..c426dd2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
@@ -74,10 +74,6 @@ public abstract class AbstractBinaryRecordBatch<T extends PhysicalOperator> exte
   }
 
   protected boolean verifyOutcomeToSetBatchState(IterOutcome leftOutcome, IterOutcome rightOutcome) {
-    if (leftOutcome == IterOutcome.STOP || rightOutcome == IterOutcome.STOP) {
-      state = BatchState.STOP;
-      return false;
-    }
 
     if (checkForEarlyFinish(leftOutcome, rightOutcome)) {
       state = BatchState.DONE;
@@ -86,7 +82,6 @@ public abstract class AbstractBinaryRecordBatch<T extends PhysicalOperator> exte
 
     // EMIT outcome is not expected as part of first batch from either side
     if (leftOutcome == IterOutcome.EMIT || rightOutcome == IterOutcome.EMIT) {
-      state = BatchState.STOP;
       throw new IllegalStateException("Unexpected IterOutcome.EMIT received either from left or right side in " +
         "buildSchema phase");
     }
@@ -135,4 +130,10 @@ public abstract class AbstractBinaryRecordBatch<T extends PhysicalOperator> exte
     stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, batchMemoryManager.getAvgOutputRowWidth());
     stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, batchMemoryManager.getTotalOutputRecords());
   }
+
+  @Override
+  protected void cancelIncoming() {
+    left.cancel();
+    right.cancel();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 3a54c5f..6e36303 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -88,8 +88,6 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     FIRST,
     /** The first data batch has already been returned. */
     NOT_FIRST,
-    /** The query most likely failed, we need to propagate STOP to the root. */
-    STOP,
     /** All work is done, no more data to be sent. */
     DONE
   }
@@ -141,7 +139,6 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
         logger.debug("Number of records in received batch: {}", b.getRecordCount());
         break;
       default:
-        break;
     }
 
     return next;
@@ -158,9 +155,6 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
             case DONE:
               lastOutcome = IterOutcome.NONE;
               break;
-            case STOP:
-              lastOutcome = IterOutcome.STOP;
-              break;
             default:
               state = BatchState.FIRST;
               lastOutcome = IterOutcome.OK_NEW_SCHEMA;
@@ -196,11 +190,11 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   protected void buildSchema() { }
 
   @Override
-  public void kill(boolean sendUpstream) {
-    killIncoming(sendUpstream);
+  public void cancel() {
+    cancelIncoming();
   }
 
-  protected abstract void killIncoming(boolean sendUpstream);
+  protected abstract void cancelIncoming();
 
   @Override
   public void close() {
@@ -244,11 +238,6 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     return container;
   }
 
-  @Override
-  public boolean hasFailed() {
-    return lastOutcome == IterOutcome.STOP;
-  }
-
   public RecordBatchStatsContext getRecordBatchStatsContext() {
     return batchStatsContext;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
index 5997a34..926ae3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
@@ -44,9 +44,8 @@ public abstract class AbstractUnaryRecordBatch<T extends PhysicalOperator> exten
   protected abstract RecordBatch getIncoming();
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
-    final RecordBatch incoming = getIncoming();
-    incoming.kill(sendUpstream);
+  protected void cancelIncoming() {
+    getIncoming().cancel();
   }
 
   @Override
@@ -80,7 +79,6 @@ public abstract class AbstractUnaryRecordBatch<T extends PhysicalOperator> exten
         }
         return upstream;
       case NOT_YET:
-      case STOP:
         if (state == BatchState.FIRST) {
           container.buildSchema(SelectionVectorMode.NONE);
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java
index 16c6383..f882731 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java
@@ -41,6 +41,7 @@ public class MajorTypeSerDe {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MajorTypeSerDe.class);
 
 
+  @SuppressWarnings("serial")
   public static class De extends StdDeserializer<MajorType> {
 
     public De() {
@@ -55,6 +56,7 @@ public class MajorTypeSerDe {
 
   }
 
+  @SuppressWarnings("serial")
   public static class Se extends StdSerializer<MajorType> {
 
     public Se() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index bdb8341..5306dce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -86,9 +86,10 @@ public interface RecordBatch extends VectorAccessible {
    * </p>
    * <p>
    *   For abnormal termination, the sequence is truncated (before the
-   *   {@code NONE}) and ends with {@link #STOP}.  That is, the sequence begins
+   *   {@code NONE}) and ends with an exception.  That is, the sequence begins
    *   with a subsequence that is some prefix of a normal-completion sequence
-   *   and that does not contain {@code NONE}, and ends with {@code STOP}.
+   *   and that does not contain {@code NONE}, and ends with the
+   *   caller throwing a (query-fatal) exception.
    * </p>
    * <p>
    *   The normal-completion return sequence is matched by the following
@@ -97,14 +98,17 @@ public interface RecordBatch extends VectorAccessible {
    *     ( NOT_YET*  OK_NEW_SCHEMA
    *       NOT_YET*  OK )*
    *     )+
-   *     NOT_YET*    NONE</pre>
+   *     NOT_YET*    &lt;exception></pre>
    * </p>
    * <h4>Obsolete Outcomes</h4>
    *
-   * The former <tt>OUT_OF_MEMORY</tt> state was never really used.
+   * The former {@code OUT_OF_MEMORY} state was never really used.
    * It is now handled by calling
    * {@link FragmentContext#requestMemory()}
    * at the point that the operator realizes it is short on memory.
+   * <p>
+   * The former {@code STOP} state was replaced with a "fail fast"
+   * approach that throws an exception when an error is detected.
    */
   enum IterOutcome {
     /**
@@ -162,21 +166,6 @@ public interface RecordBatch extends VectorAccessible {
     OK_NEW_SCHEMA(false),
 
     /**
-     * Non-completion (abnormal) termination.
-     * <p>
-     *   The call to {@link #next()}
-     *   reports that the query has terminated other than by normal completion,
-     *   and that the caller must not call any of the schema-access or
-     *   data-access methods nor call {@code next()} again.
-     * </p>
-     * <p>
-     *   The caller can consume its QueryContext to understand the current state
-     *   of things.
-     * </p>
-     */
-    STOP(true),
-
-    /**
      * No data yet.
      * <p>
      *   The call to {@link #next()}
@@ -255,10 +244,17 @@ public interface RecordBatch extends VectorAccessible {
   BatchSchema getSchema();
 
   /**
-   * Informs child nodes that this query should be terminated.  Child nodes
-   * should use the QueryContext to determine what has happened.
+   * Informs child operators that no more data is needed. Only called
+   * for "normal" cancellation to avoid unnecessary compute in any worker
+   * threads. For the error case, the fragment
+   * executor will call close() on each child automatically.
+   * <p>
+   * The operator which triggers the cancel MUST send a <code>NONE</code>
+   * status downstream, or throw an exception. It is not legal to
+   * call <code>next()</code> on an operator after calling its
+   * <code>cancel()</code> method.
    */
-  void kill(boolean sendUpstream);
+  void cancel();
 
   VectorContainer getOutgoingContainer();
 
@@ -273,7 +269,7 @@ public interface RecordBatch extends VectorAccessible {
    * Gets the value vector type and ID for the given schema path.  The
    * TypedFieldId should store a fieldId which is the same as the ordinal
    * position of the field within the Iterator provided this class's
-   * implementation of Iterable<ValueVector>.
+   * implementation of {@code Iterable<ValueVector>}.
    *
    * @param path
    *          The path where the vector should be located.
@@ -315,14 +311,4 @@ public interface RecordBatch extends VectorAccessible {
    * Perform dump of this batch's state to logs.
    */
   void dump();
-
-  /**
-   * Use this method to see if the batch has failed. Currently used when logging {@code RecordBatch}'s
-   * state using {@link #dump()} method.
-   *
-   * @return {@code true} if either {@link org.apache.drill.exec.record.RecordBatch.IterOutcome#STOP}
-   * was returned by its or child's {@link #next()} invocation or there was an {@code Exception} thrown
-   * during execution of the batch; {@code false} otherwise
-   */
-  boolean hasFailed();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
index 9bd905f..29f4fc8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
@@ -35,10 +35,10 @@ public class RecordBatchMemoryManager {
   // the incoming rows become wide, then less (than planned) would fit into the remaining current allocated memory)
   private int currentOutgoingMaxRowCount = MAX_NUM_ROWS;
   private int outgoingRowWidth;
-  private int outputBatchSize;
-  private RecordBatchSizer[] sizer;
-  private BatchStats[] inputBatchStats;
-  private BatchStats outputBatchStats;
+  private final int outputBatchSize;
+  private final RecordBatchSizer[] sizer;
+  private final BatchStats[] inputBatchStats;
+  private final BatchStats outputBatchStats;
 
   // By default, we expect one input batch stream and one output batch stream.
   // Some operators can get multiple input batch streams i.e. for example
@@ -323,7 +323,7 @@ public class RecordBatchMemoryManager {
   public void allocateVectors(VectorContainer container, int recordCount) {
     // Allocate memory for the vectors.
     // This will iteratively allocate memory for all nested columns underneath.
-    for (VectorWrapper w : container) {
+    for (VectorWrapper<?> w : container) {
       RecordBatchSizer.ColumnSize colSize = getColumnSize(w.getField().getName());
       colSize.allocateVector(w.getValueVector(), recordCount);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
index 3db9814..4f5f02c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
@@ -145,7 +145,7 @@ public class RecordBatchSizer {
     /**
      * Child columns if this is a map column.
      */
-    private Map<String, ColumnSize> children = CaseInsensitiveMap.newHashMap();
+    private final Map<String, ColumnSize> children = CaseInsensitiveMap.newHashMap();
 
     /**
      * Returns true if there is an accurate std size. Otherwise it returns false.
@@ -249,6 +249,7 @@ public class RecordBatchSizer {
           case VARDECIMAL:
             stdNetSize = 4 + 8;
             break;
+          default:
         }
       } catch (Exception e) {
         stdNetSize = 0;
@@ -638,17 +639,17 @@ public class RecordBatchSizer {
 
   // This keeps information for only top level columns. Information for nested
   // columns can be obtained from children of topColumns.
-  private Map<String, ColumnSize> columnSizes = new QuoteInsensitiveMap(CaseInsensitiveMap.newHashMap());
+  private final Map<String, ColumnSize> columnSizes = new QuoteInsensitiveMap(CaseInsensitiveMap.newHashMap());
 
   /**
    * This field is used by the convenience method {@link #columnsList()}.
    */
-  private List<ColumnSize> columnSizesList = new ArrayList<>();
+  private final List<ColumnSize> columnSizesList = new ArrayList<>();
 
   /**
    * Number of records (rows) in the batch.
    */
-  private int rowCount;
+  private final int rowCount;
   /**
    * Actual batch size summing all buffers used to store data
    * for the batch.
@@ -677,7 +678,7 @@ public class RecordBatchSizer {
 
   private int avgDensity;
 
-  private Set<BufferLedger> ledgers = Sets.newIdentityHashSet();
+  private final Set<BufferLedger> ledgers = Sets.newIdentityHashSet();
 
   private long netBatchSize;
 
@@ -963,7 +964,7 @@ public class RecordBatchSizer {
   }
 
   public void allocateVectors(VectorContainer container, int recordCount) {
-    for (VectorWrapper w : container) {
+    for (VectorWrapper<?> w : container) {
       ColumnSize colSize = columnSizes.get(w.getField().getName());
       colSize.allocateVector(w.getValueVector(), recordCount);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
index 91c1b5c..6d4e745 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
@@ -191,7 +191,6 @@ public class RecordIterator implements VectorAccessible {
       nextBatch();
       switch (lastOutcome) {
         case NONE:
-        case STOP:
           // No more data, disallow reads unless reset is called.
           outerPosition = nextOuterPosition;
           lastBatchRead = true;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
index e4278ba..d41bffa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
@@ -29,11 +29,8 @@ import java.util.Iterator;
  * Empty batch without schema and data.
  */
 public class SchemalessBatch implements CloseableRecordBatch {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemalessBatch.class);
 
-  public SchemalessBatch() {
-    logger.debug("Empty schemaless batch is created");
-  }
+  public SchemalessBatch() { }
 
   @Override
   public FragmentContext getContext() {
@@ -63,8 +60,7 @@ public class SchemalessBatch implements CloseableRecordBatch {
   }
 
   @Override
-  public void kill(boolean sendUpstream) {
-  }
+  public void cancel() { }
 
   @Override
   public VectorContainer getOutgoingContainer() {
@@ -109,12 +105,5 @@ public class SchemalessBatch implements CloseableRecordBatch {
   public VectorContainer getContainer() { return null; }
 
   @Override
-  public boolean hasFailed() {
-    return false;
-  }
-
-  @Override
-  public void dump() {
-    logger.error("SchemalessBatch[]");
-  }
+  public void dump() { }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
index 09200f2..bcd8bea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
@@ -55,8 +55,7 @@ public class SimpleRecordBatch implements RecordBatch {
   }
 
   @Override
-  public void kill(boolean sendUpstream) {
-  }
+  public void cancel() { }
 
   @Override
   public SelectionVector2 getSelectionVector2() {
@@ -107,9 +106,4 @@ public class SimpleRecordBatch implements RecordBatch {
   public void dump() {
     logger.error("SimpleRecordBatch[container=" + container + "]");
   }
-
-  @Override
-  public boolean hasFailed() {
-    return false;
-  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleComplexWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleComplexWriter.java
index d989ddc..1100cea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleComplexWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleComplexWriter.java
@@ -24,7 +24,6 @@ import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
 public class VectorAccessibleComplexWriter extends MapVector {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorAccessibleComplexWriter.class);
 
   private final VectorContainer vc;
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestCorrelation.java b/exec/java-exec/src/test/java/org/apache/drill/TestCorrelation.java
index fcdd32b..e32c97e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestCorrelation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestCorrelation.java
@@ -24,7 +24,6 @@ import org.junit.experimental.categories.Category;
 
 @Category({SqlTest.class, OperatorTest.class})
 public class TestCorrelation extends PlanTestBase {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestCorrelation.class);
 
   @Test  // DRILL-2962
   public void testScalarAggCorrelatedSubquery() throws Exception {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index a2d0ef6..5273e98 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -29,6 +29,7 @@ import org.apache.drill.categories.SqlFunctionTest;
 import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.test.BaseTestQuery;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -172,15 +173,18 @@ public class TestExampleQueries extends BaseTestQuery {
 
   @Test
   public void testJoinMerge() throws Exception {
-    test("alter session set `planner.enable_hashjoin` = false");
-    test("select count(*) \n" +
-        "  from (select l.l_orderkey as x, c.c_custkey as y \n" +
-        "  from cp.`tpch/lineitem.parquet` l \n" +
-        "    left outer join cp.`tpch/customer.parquet` c \n" +
-        "      on l.l_orderkey = c.c_custkey) as foo\n" +
-        "  where x < 10000\n" +
-        "");
-    test("alter session set `planner.enable_hashjoin` = true");
+    alterSession(PlannerSettings.ENABLE_HASH_JOIN_OPTION, false);
+    try {
+      test("select count(*) \n" +
+          "  from (select l.l_orderkey as x, c.c_custkey as y \n" +
+          "  from cp.`tpch/lineitem.parquet` l \n" +
+          "    left outer join cp.`tpch/customer.parquet` c \n" +
+          "      on l.l_orderkey = c.c_custkey) as foo\n" +
+          "  where x < 10000\n" +
+          "");
+    } finally {
+      resetSessionOption(PlannerSettings.ENABLE_HASH_JOIN_OPTION);
+    }
   }
 
   @Test
@@ -293,15 +297,19 @@ public class TestExampleQueries extends BaseTestQuery {
   @Test
   @Ignore("DRILL-3004")
   public void testJoin() throws Exception {
-    test("alter session set `planner.enable_hashjoin` = false");
-    test("SELECT\n" +
-        "  nations.N_NAME,\n" +
-        "  regions.R_NAME\n" +
-        "FROM\n" +
-        "  cp.`tpch/nation.parquet` nations\n" +
-        "JOIN\n" +
-        "  cp.`tpch/region.parquet` regions\n" +
-        "  on nations.N_REGIONKEY = regions.R_REGIONKEY where 1 = 0");
+    try {
+      alterSession(PlannerSettings.ENABLE_HASH_JOIN_OPTION, false);
+      test("SELECT\n" +
+          "  nations.N_NAME,\n" +
+          "  regions.R_NAME\n" +
+          "FROM\n" +
+          "  cp.`tpch/nation.parquet` nations\n" +
+          "JOIN\n" +
+          "  cp.`tpch/region.parquet` regions\n" +
+          "  on nations.N_REGIONKEY = regions.R_REGIONKEY where 1 = 0");
+    } finally {
+      resetSessionOption(PlannerSettings.ENABLE_HASH_JOIN_OPTION);
+    }
   }
 
 
@@ -509,7 +517,12 @@ public class TestExampleQueries extends BaseTestQuery {
   public void testOrderByDiffColumnsInSubqAndOuter() throws Exception {
     String query = "select n.n_nationkey from  (select n_nationkey, n_regionkey from cp.`tpch/nation.parquet` order by n_regionkey) n  order by n.n_nationkey";
     // set slice_target = 1 to force exchanges
-    test("alter session set `planner.slice_target` = 1; " + query);
+    try {
+      alterSession(ExecConstants.SLICE_TARGET, 1);
+      test(query);
+    } finally {
+      resetSessionOption(ExecConstants.SLICE_TARGET);
+    }
   }
 
   @Test // DRILL-1846  (this tests issue with UnionExchange)
@@ -517,7 +530,12 @@ public class TestExampleQueries extends BaseTestQuery {
   public void testLimitInSubqAndOrderByOuter() throws Exception {
     String query = "select t2.n_nationkey from (select n_nationkey, n_regionkey from cp.`tpch/nation.parquet` t1 group by n_nationkey, n_regionkey limit 10) t2 order by t2.n_nationkey";
     // set slice_target = 1 to force exchanges
-    test("alter session set `planner.slice_target` = 1; " + query);
+    try {
+      alterSession(ExecConstants.SLICE_TARGET, 1);
+      test(query);
+    } finally {
+      resetSessionOption(ExecConstants.SLICE_TARGET);
+    }
   }
 
   @Test // DRILL-1788
@@ -531,7 +549,12 @@ public class TestExampleQueries extends BaseTestQuery {
   public void test2PhaseAggAfterOrderBy() throws Exception {
     String query = "select count(*) from (select o_custkey from cp.`tpch/orders.parquet` order by o_custkey)";
     // set slice_target = 1 to force exchanges and 2-phase aggregation
-    test("alter session set `planner.slice_target` = 1; " + query);
+    try {
+      alterSession(ExecConstants.SLICE_TARGET, 1);
+      test(query);
+    } finally {
+      resetSessionOption(ExecConstants.SLICE_TARGET);
+    }
   }
 
   @Test // DRILL-1867
@@ -578,18 +601,27 @@ public class TestExampleQueries extends BaseTestQuery {
   @Test
   public void testMultipleCountDistinctWithGroupBy() throws Exception {
     String query = "select n_regionkey, count(distinct n_nationkey), count(distinct n_name) from cp.`tpch/nation.parquet` group by n_regionkey;";
-    String hashagg_only = "alter session set `planner.enable_hashagg` = true; " +
-        "alter session set `planner.enable_streamagg` = false;";
-    String streamagg_only = "alter session set `planner.enable_hashagg` = false; " +
-        "alter session set `planner.enable_streamagg` = true;";
 
     // hash agg and streaming agg with default slice target (single phase aggregate)
-    test(hashagg_only + query);
-    test(streamagg_only + query);
-
     // hash agg and streaming agg with lower slice target (multiphase aggregate)
-    test("alter session set `planner.slice_target` = 1; " + hashagg_only + query);
-    test("alter session set `planner.slice_target` = 1; " + streamagg_only + query);
+    try {
+      alterSession(PlannerSettings.ENABLE_HASH_AGG_OPTION, true);
+      alterSession(PlannerSettings.ENABLE_STREAM_AGG_OPTION, false);
+      test(query);
+      alterSession(ExecConstants.SLICE_TARGET, 1);
+      test(query);
+      resetSessionOption(ExecConstants.SLICE_TARGET);
+
+      alterSession(PlannerSettings.ENABLE_HASH_AGG_OPTION, false);
+      alterSession(PlannerSettings.ENABLE_STREAM_AGG_OPTION, true);
+      test(query);
+      alterSession(ExecConstants.SLICE_TARGET, 1);
+      test(query);
+    } finally {
+      resetSessionOption(PlannerSettings.ENABLE_HASH_AGG_OPTION);
+      resetSessionOption(PlannerSettings.ENABLE_STREAM_AGG_OPTION);
+      resetSessionOption(ExecConstants.SLICE_TARGET);
+    }
   }
 
   @Test // DRILL-2019
@@ -826,12 +858,15 @@ public class TestExampleQueries extends BaseTestQuery {
 
   @Test // DRILL-2221
   public void createJsonWithEmptyList() throws Exception {
-    final String tableName = "jsonWithEmptyList";
-    test("USE dfs.tmp");
-    test("ALTER SESSION SET `store.format`='json'");
-    test("CREATE TABLE %s AS SELECT * FROM cp.`store/json/record_with_empty_list.json`", tableName);
-    test("SELECT COUNT(*) FROM %s", tableName);
-    test("ALTER SESSION SET `store.format`='parquet'");
+    try {
+      final String tableName = "jsonWithEmptyList";
+      test("USE dfs.tmp");
+      alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "json");
+      test("CREATE TABLE %s AS SELECT * FROM cp.`store/json/record_with_empty_list.json`", tableName);
+      test("SELECT COUNT(*) FROM %s", tableName);
+    } finally {
+      resetSessionOption(ExecConstants.OUTPUT_FORMAT_OPTION);
+    }
   }
 
   @Test // DRILL-2914
@@ -993,13 +1028,18 @@ public class TestExampleQueries extends BaseTestQuery {
         "where 1 = 0";
 
 
-    testBuilder()
-        .sqlQuery(query)
-        .expectsEmptyResultSet()
-        .optionSettingQueriesForTestQuery("ALTER SESSION SET `planner.enable_hashjoin` = false; " +
-            "ALTER SESSION SET `planner.disable_exchanges` = true")
-        .build()
-        .run();
+    try {
+      setSessionOption(PlannerSettings.DISABLE_EXCHANGE_OPTION, true);
+      setSessionOption(PlannerSettings.ENABLE_HASH_JOIN_OPTION, false);
+      testBuilder()
+          .sqlQuery(query)
+          .expectsEmptyResultSet()
+          .build()
+          .run();
+    } finally {
+      resetSessionOption(PlannerSettings.DISABLE_EXCHANGE_OPTION);
+      resetSessionOption(PlannerSettings.ENABLE_HASH_JOIN_OPTION);
+    }
   }
 
   @Test
@@ -1031,8 +1071,6 @@ public class TestExampleQueries extends BaseTestQuery {
         "create table mytable4  partition by (r_regionkey, r_comment) as " +
         "  select  r.* from cp.`tpch/nation.parquet` n, cp.`tpch/region.parquet` r " +
         "  where n.n_regionkey = r.r_regionkey");
-
-
   }
 
   @Test // DRILL-3210
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
index e84bb27..4717993 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
@@ -25,7 +25,6 @@ import org.junit.experimental.categories.Category;
 
 @Category({SlowTest.class})
 public class TestTpchLimit0 extends BaseTestQuery {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTpchLimit0.class);
 
   private void testLimitZero(String fileName) throws Exception {
     String query = getFile(fileName);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java
index a4d10b6..90e9e00 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java
@@ -47,7 +47,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 /**
  * Class to test different planning use cases (separate from query execution)
- *
+ * (Though, despite the above, this test does execute queries.)
  */
 @Category({SlowTest.class, PlannerTest.class})
 public class DrillSeparatePlanningTest extends ClusterTest {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 054cdee..6e1ce9b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -160,14 +160,9 @@ public class MockRecordBatch implements CloseableRecordBatch {
   }
 
   @Override
-  public void kill(boolean sendUpstream) {
+  public void cancel() {
     if (!limitWithUnnest) {
       isDone = true;
-      container.clear();
-      container.setRecordCount(0);
-      if (sv2 != null) {
-        sv2.clear();
-      }
     }
   }
 
@@ -254,7 +249,6 @@ public class MockRecordBatch implements CloseableRecordBatch {
         ++currentContainerIndex;
         return currentOutcome;
       case NONE:
-      case STOP:
         isDone = true;
       case NOT_YET:
         container.setRecordCount(0);
@@ -286,11 +280,6 @@ public class MockRecordBatch implements CloseableRecordBatch {
   }
 
   @Override
-  public boolean hasFailed() {
-    return false;
-  }
-
-  @Override
   public void dump() { }
 
   public static class Builder {
@@ -324,8 +313,6 @@ public class MockRecordBatch implements CloseableRecordBatch {
     }
 
     public Builder terminateWithError(IterOutcome errorOutcome) {
-      Preconditions.checkArgument(errorOutcome != IterOutcome.STOP);
-
       iterOutcomes.add(errorOutcome);
       return this;
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index 8f0d677..eb48f08 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -105,7 +105,6 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector> {
   public boolean next() {
     switch (incoming.next()) {
     case NONE:
-    case STOP:
       return false;
     default:
       return true;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java
index 335c5e8..17f529f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java
@@ -88,6 +88,7 @@ public class TestBroadcastExchange extends PopUnitTestBase {
         }
         b.release();
       }
+      // Nothing done with count?
     }
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
index ae2a0db..9c2cff8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
@@ -62,8 +62,8 @@ public class TestHashJoinOutcome extends PhysicalOpUnitTestBase {
   private static BatchSchema batchSchemaLeft;
 
   // Input containers -- where row count is not set for the 2nd container !!
-  private List<VectorContainer> uninitialized2ndInputContainersRight = new ArrayList<>(5);
-  private List<VectorContainer> uninitialized2ndInputContainersLeft = new ArrayList<>(5);
+  private final List<VectorContainer> uninitialized2ndInputContainersRight = new ArrayList<>(5);
+  private final List<VectorContainer> uninitialized2ndInputContainersLeft = new ArrayList<>(5);
 
   private RowSet.SingleRowSet emptyInputRowSetRight;
   private RowSet.SingleRowSet emptyInputRowSetLeft;
@@ -191,16 +191,6 @@ public class TestHashJoinOutcome extends PhysicalOpUnitTestBase {
   }
 
   @Test
-  public void testHashJoinStopOutcomeUninitRightSide() {
-    testHashJoinOutcomes(UninitializedSide.Right, RecordBatch.IterOutcome.STOP, RecordBatch.IterOutcome.STOP);
-  }
-
-  @Test
-  public void testHashJoinStopOutcomeUninitLeftSide() {
-    testHashJoinOutcomes(UninitializedSide.Left, RecordBatch.IterOutcome.STOP, RecordBatch.IterOutcome.STOP);
-  }
-
-  @Test
   public void testHashJoinNoneOutcomeUninitRightSide() {
     testHashJoinOutcomes(UninitializedSide.Right, RecordBatch.IterOutcome.NONE, RecordBatch.IterOutcome.NONE);
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index 0e64600..b3add2f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -154,7 +154,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
    * @return
    */
   private boolean isTerminal(RecordBatch.IterOutcome outcome) {
-    return (outcome == RecordBatch.IterOutcome.NONE || outcome == RecordBatch.IterOutcome.STOP);
+    return (outcome == RecordBatch.IterOutcome.NONE);
   }
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
index c743582..c93f5cb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
@@ -33,9 +33,8 @@ import static org.junit.Assert.assertTrue;
 public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
 
   /**
-   * Test to show empty batch with both OK_NEW_SCHEMA and EMIT outcome is not ignored by Limit and is pass through to
-   * the downstream operator.
-   * @throws Throwable
+   * Test to show empty batch with both OK_NEW_SCHEMA and EMIT outcome is not
+   * ignored by Limit and is pass through to the downstream operator.
    */
   @Test
   public void testLimitEmptyBatchEmitOutcome() throws Throwable {
@@ -52,6 +51,7 @@ public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
     mockInputBatch.useUnnestKillHandlingForLimit(true);
 
     final Limit limitConf = new Limit(null, 0, 1);
+    @SuppressWarnings("resource")
     final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf, operatorFixture.getFragmentContext(),
       mockInputBatch);
 
@@ -63,9 +63,8 @@ public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
   }
 
   /**
-   * Test to validate limit considers all the data until it sees EMIT outcome and return output batch with data that
-   * meets the limit criteria.
-   * @throws Throwable
+   * Test to validate limit considers all the data until it sees EMIT outcome
+   * and return output batch with data that meets the limit criteria.
    */
   @Test
   public void testLimitNonEmptyBatchEmitOutcome() throws Throwable {
@@ -82,6 +81,7 @@ public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
     mockInputBatch.useUnnestKillHandlingForLimit(true);
 
     final Limit limitConf = new Limit(null, 0, 1);
+    @SuppressWarnings("resource")
     final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf, operatorFixture.getFragmentContext(),
       mockInputBatch);
 
@@ -94,9 +94,9 @@ public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
   }
 
   /**
-   * Test to show that once a limit number of records is produced using first set of batches then on getting a batch
-   * with EMIT outcome, the limit state is again refreshed and applied to next set of batches with data.
-   * @throws Throwable
+   * Test to show that once a limit number of records is produced using first
+   * set of batches then on getting a batch with EMIT outcome, the limit state
+   * is again refreshed and applied to next set of batches with data.
    */
   @Test
   public void testLimitResetsAfterFirstEmitOutcome() throws Throwable {
@@ -119,6 +119,7 @@ public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
     mockInputBatch.useUnnestKillHandlingForLimit(true);
 
     final Limit limitConf = new Limit(null, 0, 1);
+    @SuppressWarnings("resource")
     final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf, operatorFixture.getFragmentContext(),
       mockInputBatch);
 
@@ -134,10 +135,10 @@ public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
   }
 
   /**
-   * Test to show that when the limit number of records is found with first incoming batch, then next empty incoming
-   * batch with OK outcome is ignored, but the empty EMIT outcome batch is not ignored. Empty incoming batch with
+   * Test to show that when the limit number of records is found with first
+   * incoming batch, then next empty incoming batch with OK outcome is ignored,
+   * but the empty EMIT outcome batch is not ignored. Empty incoming batch with
    * EMIT outcome produces empty output batch with EMIT outcome.
-   * @throws Throwable
    */
   @Test
   public void testLimitNonEmptyFirst_EmptyOKEmitOutcome() throws Throwable {
@@ -157,6 +158,7 @@ public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
     mockInputBatch.useUnnestKillHandlingForLimit(true);
 
     final Limit limitConf = new Limit(null, 0, 1);
+    @SuppressWarnings("resource")
     final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf, operatorFixture.getFragmentContext(),
       mockInputBatch);
 
@@ -169,15 +171,18 @@ public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
   }
 
   /**
-   * Test to show that limit refreshes it's state after seeing first EMIT outcome and works on data batches following
-   * it as new set's of incoming batch and apply the limits rule from fresh on those. So for first set of batches with
-   * OK_NEW_SCHEMA and EMIT outcome but total number of records received being less than limit condition, it still
-   * produces an output with that many records (in this case 1 even though limit number of records is 2).
-   *
-   * After seeing EMIT, it refreshes it's state and operate on next input batches to again return limit number of
-   * records. So for 3rd batch with 2 records but with EMIT outcome it produces an output batch with 2 records not
-   * with 1 since state is refreshed.
-   * @throws Throwable
+   * Test to show that limit refreshes it's state after seeing first EMIT
+   * outcome and works on data batches following it as new set's of incoming
+   * batch and apply the limits rule from fresh on those. So for first set of
+   * batches with OK_NEW_SCHEMA and EMIT outcome but total number of records
+   * received being less than limit condition, it still produces an output with
+   * that many records (in this case 1 even though limit number of records is
+   * 2).
+   * <p>
+   * After seeing EMIT, it refreshes it's state and operate on next input
+   * batches to again return limit number of records. So for 3rd batch with 2
+   * records but with EMIT outcome it produces an output batch with 2 records
+   * not with 1 since state is refreshed.
    */
   @Test
   public void testMultipleLimitWithEMITOutcome() throws Throwable {
@@ -201,6 +206,7 @@ public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
     mockInputBatch.useUnnestKillHandlingForLimit(true);
 
     final Limit limitConf = new Limit(null, 0, 2);
+    @SuppressWarnings("resource")
     final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf, operatorFixture.getFragmentContext(),
       mockInputBatch);
 
@@ -219,9 +225,8 @@ public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
   }
 
   /**
-   * Test shows that limit operates on multiple input batches until it finds limit number of records or it sees an
-   * EMIT outcome to refresh it's state.
-   * @throws Throwable
+   * Test shows that limit operates on multiple input batches until it finds
+   * limit number of records or it sees an EMIT outcome to refresh it's state.
    */
   @Test
   public void testLimitNonEmptyFirst_NonEmptyOK_EmptyBatchEmitOutcome() throws Throwable {
@@ -246,6 +251,7 @@ public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
     mockInputBatch.useUnnestKillHandlingForLimit(true);
 
     final Limit limitConf = new Limit(null, 0, 2);
+    @SuppressWarnings("resource")
     final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf, operatorFixture.getFragmentContext(),
       mockInputBatch);
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
index d116e25..63cf2b2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
@@ -63,7 +63,6 @@ import org.slf4j.LoggerFactory;
 
 @Category(RowSetTests.class)
 public class TestOperatorRecordBatch extends SubOperatorTest {
-
   private static final Logger logger = LoggerFactory.getLogger(TestOperatorRecordBatch.class);
 
   /**
@@ -265,7 +264,7 @@ public class TestOperatorRecordBatch extends SubOperatorTest {
     opExec.nextCalls = 2;
 
     try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
-      opBatch.kill(false);
+      opBatch.cancel();
       assertFalse(opExec.buildSchemaCalled);
       assertEquals(0, opExec.nextCount);
       assertFalse(opExec.cancelCalled);
@@ -287,7 +286,7 @@ public class TestOperatorRecordBatch extends SubOperatorTest {
     try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
       assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
       assertEquals(IterOutcome.OK, opBatch.next());
-      opBatch.kill(false);
+      opBatch.cancel();
       assertTrue(opExec.cancelCalled);
     } catch (Exception e) {
       fail();
@@ -310,7 +309,7 @@ public class TestOperatorRecordBatch extends SubOperatorTest {
       assertEquals(IterOutcome.OK, opBatch.next());
       assertEquals(IterOutcome.OK, opBatch.next());
       assertEquals(IterOutcome.NONE, opBatch.next());
-      opBatch.kill(false);
+      opBatch.cancel();
 
       // Already hit EOF, so fail won't be passed along.
 
@@ -342,7 +341,7 @@ public class TestOperatorRecordBatch extends SubOperatorTest {
       fail();
     }
     assertTrue(opExec.closeCalled);
-    opBatch.kill(false);
+    opBatch.cancel();
     assertFalse(opExec.cancelCalled);
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
index adfb107..e37b375 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
@@ -139,7 +139,6 @@ public class MockLateralJoinBatch implements LateralContract, CloseableRecordBat
         }
         return currentOutcome;
       case NONE:
-      case STOP:
         isDone = true;
         return currentOutcome;
       case NOT_YET:
@@ -163,15 +162,10 @@ public class MockLateralJoinBatch implements LateralContract, CloseableRecordBat
   }
 
   @Override
-  public boolean hasFailed() {
-    return false;
-  }
+  public void dump() { }
 
   @Override
-  public void dump() {
-  }
-
-  @Override public int getRecordCount() {
+  public int getRecordCount() {
     return 0;
   }
 
@@ -190,23 +184,28 @@ public class MockLateralJoinBatch implements LateralContract, CloseableRecordBat
     return context;
   }
 
-  @Override public BatchSchema getSchema() {
+  @Override
+  public BatchSchema getSchema() {
     return null;
   }
 
-  @Override public void kill(boolean sendUpstream) {
-    unnest.kill(sendUpstream);
+  @Override
+  public void cancel() {
+    unnest.cancel();
   }
 
-  @Override public VectorContainer getOutgoingContainer() {
+  @Override
+  public VectorContainer getOutgoingContainer() {
     return null;
   }
 
-  @Override public TypedFieldId getValueVectorId(SchemaPath path) {
+  @Override
+  public TypedFieldId getValueVectorId(SchemaPath path) {
     return null;
   }
 
-  @Override public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+  @Override
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
     return null;
   }
 
@@ -242,7 +241,8 @@ public class MockLateralJoinBatch implements LateralContract, CloseableRecordBat
   @Override
   public VectorContainer getContainer() { return null; }
 
-  @Override public Iterator<VectorWrapper<?>> iterator() {
+  @Override
+  public Iterator<VectorWrapper<?>> iterator() {
     return null;
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
index bec4683..b4c41cf 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
@@ -567,7 +567,7 @@ import static org.junit.Assert.assertTrue;
         batchesProcessed++;
         if (batchesProcessed == execKill) {
           lateralJoinBatch.getContext().getExecutorState().fail(new DrillException("Testing failure of execution."));
-          lateralJoinBatch.kill(true);
+          lateralJoinBatch.cancel();
         }
         // else nothing to do
       }
@@ -716,8 +716,7 @@ import static org.junit.Assert.assertTrue;
   }
 
   private boolean isTerminal(RecordBatch.IterOutcome outcome) {
-    return (outcome == RecordBatch.IterOutcome.NONE || outcome == RecordBatch.IterOutcome.STOP);
+    return (outcome == RecordBatch.IterOutcome.NONE);
   }
-
 }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index 0baf0a0..3540ef1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -812,10 +812,9 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
   }
 
   private boolean isTerminal(RecordBatch.IterOutcome outcome) {
-    return (outcome == RecordBatch.IterOutcome.NONE || outcome == RecordBatch.IterOutcome.STOP);
+    return (outcome == RecordBatch.IterOutcome.NONE);
   }
 
-
   /**
    *     Run a plan like the following for various input batches :
    *             Lateral1
@@ -921,7 +920,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
           batchesProcessed++;
           if (batchesProcessed == execKill) {
             lateralJoinBatch1.getContext().getExecutorState().fail(new DrillException("Testing failure of execution."));
-            lateralJoinBatch1.kill(true);
+            lateralJoinBatch1.cancel();
           }
           // else nothing to do
         }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
index 5875a40..1ec6739 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
@@ -79,7 +79,7 @@ public class BloomFilterTest extends SubOperatorTest {
     }
 
     @Override
-    public void kill(boolean sendUpstream) {
+    public void cancel() {
     }
 
     @Override
@@ -118,13 +118,7 @@ public class BloomFilterTest extends SubOperatorTest {
     }
 
     @Override
-    public void dump() {
-    }
-
-    @Override
-    public boolean hasFailed() {
-      return false;
-    }
+    public void dump() { }
   }
 
   @Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
index 07d8e03..ecc3918 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
@@ -145,8 +145,7 @@ public class PhysicalOpUnitTestBase extends ExecTest {
             lastResultOutcome = operator.next();
             needToGrabNext = false;
           }
-          if (lastResultOutcome == RecordBatch.IterOutcome.NONE
-            || lastResultOutcome == RecordBatch.IterOutcome.STOP) {
+          if (lastResultOutcome == RecordBatch.IterOutcome.NONE) {
             return false;
           } else {
             return true;