You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ih...@apache.org on 2020/02/12 10:52:09 UTC

[drill] 05/05: DRILL-7576: Fail fast for operator errors

This is an automated email from the ASF dual-hosted git repository.

ihuzenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 09b805aea4dafe50555b23945302cf8f6c491de8
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Sat Feb 8 22:36:25 2020 -0800

    DRILL-7576: Fail fast for operator errors
    
    Converts operators to fail with a UserException rather than using
    the STOP iterator status. The result is clearer error messages
    and simpler code.
    
    closes #1975
---
 .../common/exceptions/DrillRuntimeException.java   |   1 -
 .../drill/maprdb/tests/json/TestSimpleJson.java    |  18 +-
 .../templates/StatisticsRecordWriterImpl.java      |  26 +-
 .../drill/exec/client/LoggingResultsListener.java  |  11 +-
 .../exec/physical/config/UnorderedReceiver.java    |   1 -
 .../drill/exec/physical/impl/BaseRootExec.java     |  59 +++-
 .../drill/exec/physical/impl/ImplCreator.java      |  10 +-
 .../apache/drill/exec/physical/impl/RootExec.java  |  39 +--
 .../apache/drill/exec/physical/impl/ScanBatch.java |  51 ++--
 .../drill/exec/physical/impl/ScreenCreator.java    |  68 ++---
 .../physical/impl/StatisticsWriterRecordBatch.java | 116 ++++----
 .../physical/impl/TopN/PriorityQueueTemplate.java  |  23 +-
 .../drill/exec/physical/impl/TopN/TopNBatch.java   | 319 +++++++++++----------
 .../exec/physical/impl/WriterRecordBatch.java      | 111 +++----
 .../exec/physical/impl/aggregate/HashAggBatch.java |   8 +-
 .../impl/aggregate/SpilledRecordBatch.java         |  20 +-
 .../physical/impl/aggregate/StreamingAggBatch.java |  12 +-
 .../physical/impl/filter/FilterRecordBatch.java    |   3 +-
 .../impl/filter/RuntimeFilterBatchCreator.java     |   6 +-
 .../impl/filter/RuntimeFilterRecordBatch.java      |  41 +--
 .../exec/physical/impl/join/HashJoinBatch.java     |  14 +-
 .../exec/physical/impl/join/MergeJoinBatch.java    |   7 +-
 .../exec/physical/impl/join/RowKeyJoinBatch.java   |  10 +-
 .../exec/physical/impl/limit/LimitRecordBatch.java |   7 +-
 .../impl/limit/PartitionLimitRecordBatch.java      |   3 +-
 .../impl/mergereceiver/MergingRecordBatch.java     | 166 +++++------
 .../impl/metadata/MetadataControllerBatch.java     |  73 ++---
 .../impl/metadata/MetadataHandlerBatch.java        |  10 +-
 .../OrderedPartitionRecordBatch.java               |  37 +--
 .../partitionsender/PartitionSenderRootExec.java   |   6 +-
 .../impl/partitionsender/PartitionerDecorator.java |  18 +-
 .../impl/producer/ProducerConsumerBatch.java       |  38 +--
 .../impl/protocol/OperatorRecordBatch.java         |   5 -
 .../RangePartitionRecordBatch.java                 |   2 +-
 .../physical/impl/sort/SortRecordBatchBuilder.java |   7 +-
 .../impl/statistics/StatisticsMergeBatch.java      |  88 +++---
 .../impl/svremover/RemovingRecordBatch.java        |   3 +-
 .../physical/impl/unnest/UnnestRecordBatch.java    |  76 +++--
 .../unorderedreceiver/UnorderedReceiverBatch.java  |  15 +-
 .../impl/unpivot/UnpivotMapsRecordBatch.java       |  42 +--
 .../physical/impl/window/FrameSupportTemplate.java |  41 ++-
 .../impl/window/NoFrameSupportTemplate.java        |  27 +-
 .../impl/window/WindowFrameRecordBatch.java        |  31 +-
 .../exec/physical/impl/window/WindowFramer.java    |   3 +-
 .../drill/exec/record/AbstractRecordBatch.java     |  10 +-
 .../record/AbstractTableFunctionRecordBatch.java   |  19 +-
 .../exec/record/AbstractUnaryRecordBatch.java      |  15 +-
 .../drill/exec/record/RecordBatchLoader.java       |   3 +-
 .../drill/exec/record/SimpleRecordBatch.java       |   4 +-
 .../exec/record/VectorAccessibleUtilities.java     |   7 +
 .../exec/record/selection/SelectionVector4.java    |  52 ++--
 .../drill/exec/store}/StatisticsRecordWriter.java  |  21 +-
 .../exec/store/easy/json/JsonRecordWriter.java     |  10 +-
 .../easy/json/JsonStatisticsRecordWriter.java      |  27 +-
 .../drill/exec/work/fragment/FragmentExecutor.java |  36 ++-
 .../drill/exec/physical/impl/MockRecordBatch.java  |   5 +-
 .../drill/exec/physical/impl/SimpleRootExec.java   |   4 +-
 .../exec/physical/impl/TestStackAnalyzer.java      | 132 +++++++++
 .../impl/partitionsender/TestPartitionSender.java  |  32 ++-
 .../unnest/TestUnnestWithLateralCorrectness.java   |  55 ++--
 .../drill/exec/server/TestDrillbitResilience.java  |  11 +-
 .../exec/store/parquet/ParquetResultListener.java  |  17 +-
 .../org/apache/drill/test/DrillTestWrapper.java    |  48 ++--
 .../java/org/apache/drill/test/QueryBuilder.java   |  41 ++-
 .../org/apache/drill/test/QueryRowSetIterator.java |  17 +-
 .../org/apache/drill/jdbc/impl/DrillCursor.java    | 141 +++++----
 66 files changed, 1194 insertions(+), 1185 deletions(-)

diff --git a/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java b/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java
index c5c7170..35cf586 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java
@@ -18,7 +18,6 @@
 package org.apache.drill.common.exceptions;
 
 public class DrillRuntimeException extends RuntimeException {
-
   private static final long serialVersionUID = -3796081521525479249L;
 
   public DrillRuntimeException() {
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
index 9073858..a5be2ab 100644
--- a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
@@ -25,7 +25,6 @@ import java.io.InputStream;
 
 import org.apache.drill.PlanTestBase;
 import org.apache.drill.SingleRowListener;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
@@ -53,6 +52,7 @@ public class TestSimpleJson extends BaseJsonTest {
 
   private static boolean tableCreated = false;
   private static String tablePath;
+  @Override
   protected String getTablePath() {
     return tablePath;
   }
@@ -148,16 +148,12 @@ public class TestSimpleJson extends BaseJsonTest {
     SingleRowListener listener = new SingleRowListener() {
       @Override
       protected void rowArrived(QueryDataBatch result) {
-        try {
-          final RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
-          loader.load(result.getHeader().getDef(), result.getData());
-          StringBuilder sb = new StringBuilder();
-          VectorUtil.appendVectorAccessibleContent(loader, sb, "|", false);
-          loader.clear();
-          queryResult.set("result", sb.toString());
-        } catch (SchemaChangeException e) {
-          queryResult.set("error", "true");
-        }
+        final RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
+        loader.load(result.getHeader().getDef(), result.getData());
+        StringBuilder sb = new StringBuilder();
+        VectorUtil.appendVectorAccessibleContent(loader, sb, "|", false);
+        loader.clear();
+        queryResult.set("result", sb.toString());
       }
     };
     testWithListener(QueryType.SQL, sql, listener);
diff --git a/exec/java-exec/src/main/codegen/templates/StatisticsRecordWriterImpl.java b/exec/java-exec/src/main/codegen/templates/StatisticsRecordWriterImpl.java
index f1c6962..3f8f4e2 100644
--- a/exec/java-exec/src/main/codegen/templates/StatisticsRecordWriterImpl.java
+++ b/exec/java-exec/src/main/codegen/templates/StatisticsRecordWriterImpl.java
@@ -83,21 +83,16 @@ public class StatisticsRecordWriterImpl {
     }
   }
 
-  private void initFieldWriters() throws IOException {
+  private void initFieldWriters() {
     fieldConverters = Lists.newArrayList();
-    try {
-      int fieldId = 0;
-      for (VectorWrapper w : batch) {
-        if (w.getField().getName().equalsIgnoreCase(WriterPrel.PARTITION_COMPARATOR_FIELD)) {
-          continue;
-        }
-        FieldReader reader = w.getValueVector().getReader();
-        FieldConverter converter = getConverter(recordWriter, fieldId++, w.getField().getName(), reader);
-        fieldConverters.add(converter);
+    int fieldId = 0;
+    for (VectorWrapper<?> w : batch) {
+      if (w.getField().getName().equalsIgnoreCase(WriterPrel.PARTITION_COMPARATOR_FIELD)) {
+        continue;
       }
-    } catch(Exception e) {
-      logger.error("Failed to create FieldWriter.", e);
-      throw new IOException("Failed to initialize FieldWriters.", e);
+      FieldReader reader = w.getValueVector().getReader();
+      FieldConverter converter = getConverter(recordWriter, fieldId++, w.getField().getName(), reader);
+      fieldConverters.add(converter);
     }
   }
 
@@ -114,10 +109,13 @@ public class StatisticsRecordWriterImpl {
           return recordWriter.getNewNullable${minor.class}Converter(fieldId, fieldName, reader);
         case REPEATED:
           return recordWriter.getNewRepeated${minor.class}Converter(fieldId, fieldName, reader);
+        default:
+          throw new UnsupportedOperationException();
       }
       </#list>
       </#list>
+      default:
+        throw new UnsupportedOperationException();
     }
-    throw new UnsupportedOperationException();
   }
 }
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java
index 951b33d..25e472f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java
@@ -24,7 +24,6 @@ import org.apache.drill.common.DrillAutoCloseables;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.client.QuerySubmitter.Format;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.RootAllocatorFactory;
 import org.apache.drill.exec.proto.UserBitShared.QueryData;
@@ -76,13 +75,9 @@ public class LoggingResultsListener implements UserResultsListener {
     try {
       if (data != null) {
         count.addAndGet(header.getRowCount());
-        try {
-          loader.load(header.getDef(), data);
-          // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
-          // SchemaChangeException, so check/clean catch clause below.
-        } catch (SchemaChangeException e) {
-          submissionFailed(UserException.systemError(e).build(logger));
-        }
+        // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
+        // SchemaChangeException.
+        loader.load(header.getDef(), data);
 
         try {
           switch(format) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
index 3291283..da6f07a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
@@ -30,7 +30,6 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName("unordered-receiver")
 public class UnorderedReceiver extends AbstractReceiver{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiver.class);
 
   @JsonCreator
   public UnorderedReceiver(@JsonProperty("sender-major-fragment") int oppositeMajorFragmentId,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index 24597ae..89239f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl;
 
-import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.drill.common.DeferredException;
@@ -126,7 +125,7 @@ public abstract class BaseRootExec implements RootExec {
   }
 
   @Override
-  public void dumpBatches() {
+  public void dumpBatches(Throwable t) {
     if (operators == null) {
       return;
     }
@@ -134,22 +133,18 @@ public abstract class BaseRootExec implements RootExec {
       return;
     }
 
-    final int numberOfBatchesToDump = 2;
+    CloseableRecordBatch leafMost = findLeaf(operators, t);
+    if (leafMost == null) {
+      // Don't know which batch failed.
+      return;
+    }
+    int batchPosn = operators.indexOf(leafMost);
+    final int numberOfBatchesToDump = Math.min(batchPosn + 1, 2);
     logger.error("Batch dump started: dumping last {} failed batches", numberOfBatchesToDump);
     // As batches are stored in a 'flat' List there is a need to filter out the failed batch
     // and a few of its parent (actual number of batches is set by a constant defined above)
-    List<CloseableRecordBatch> failedBatchStack = new LinkedList<>();
-    for (int i = operators.size() - 1; i >= 0; i--) {
-      CloseableRecordBatch batch = operators.get(i);
-      if (batch.hasFailed()) {
-        failedBatchStack.add(0, batch);
-        if (failedBatchStack.size() == numberOfBatchesToDump) {
-          break;
-        }
-      }
-    }
-    for (CloseableRecordBatch batch : failedBatchStack) {
-      batch.dump();
+    for (int i = 0; i < numberOfBatchesToDump; i++) {
+      operators.get(batchPosn--).dump();
     }
     logger.error("Batch dump completed.");
   }
@@ -184,4 +179,38 @@ public abstract class BaseRootExec implements RootExec {
       }
     }
   }
+
+  /**
+   * Given a list of operators and a stack trace, walks the stack trace and
+   * the operator list to find the leaf-most operator, which is the one
+   * that was active when the exception was thrown. Handle the cases in
+   * which no operator was active, each operator had multiple methods on
+   * the stack, or the exception was thrown in some class called by
+   * the operator.
+   * <p>
+   * Not all operators leave a mark in the trace. In particular if a the
+   * call stack is only through base-class methods, then we have no way to
+   * know the actual class during the call. This is OK because the leaf
+   * methods are just pass-through operations, they are unlikely to fail.
+   *
+   * @param <T> the type of the operator. Parameterized to allow easier
+   * testing
+   * @param dag the list of operators from root-most to leaf-most
+   * @param e the exception thrown somewhere in the operator tree
+   * @return the leaf-most operator, if any
+   */
+  public static <T> T findLeaf(List<T> dag, Throwable e) {
+    StackTraceElement[] trace = e.getStackTrace();
+    for (int i = dag.size() - 1; i >= 0; i--) {
+      T leaf = dag.get(i);
+      String opName = leaf.getClass().getName();
+      for (StackTraceElement element : trace) {
+        String frameName = element.getClassName();
+        if (frameName.contentEquals(opName)) {
+          return leaf;
+        }
+      }
+    }
+    return null;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 87f22be..a1f25c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -107,8 +107,9 @@ public class ImplCreator {
     return null;
   }
 
-  /** Create RootExec and its children (RecordBatches) for given FragmentRoot */
-
+  /**
+   * Create RootExec and its children (RecordBatches) for given FragmentRoot
+   */
   @SuppressWarnings("unchecked")
   private RootExec getRootExec(final FragmentRoot root, final ExecutorFragmentContext context) throws ExecutionSetupException {
     final List<RecordBatch> childRecordBatches = getChildren(root, context);
@@ -132,8 +133,9 @@ public class ImplCreator {
     }
   }
 
-
-  /** Create a RecordBatch and its children for given PhysicalOperator */
+  /**
+   * Create a RecordBatch and its children for given PhysicalOperator
+   */
   @VisibleForTesting
   public RecordBatch getRecordBatch(final PhysicalOperator op, final ExecutorFragmentContext context) throws ExecutionSetupException {
     Preconditions.checkNotNull(op);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
index 34f2131..95607a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
@@ -20,34 +20,41 @@ package org.apache.drill.exec.physical.impl;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 
 /**
- * <h2>Functionality</h2>
- * <p>
- *   A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange
- *   output nodes and storage nodes.  They are there driving force behind the completion of a query.
- * </p>
- * <h2>Assumptions</h2>
- * <p>
- *   All implementations of {@link RootExec} assume that all their methods are called by the same thread.
+ * Node which is the last processing node in a query plan. FragmentTerminals
+ * include Exchange output nodes and storage nodes. They are there driving force
+ * behind the completion of a query.
  * </p>
+ * Assumes that all implementations of {@link RootExec} assume that all their
+ * methods are called by the same thread.
  */
 public interface RootExec extends AutoCloseable {
+
   /**
    * Do the next batch of work.
-   * @return Whether or not additional batches of work are necessary. False means that this fragment is done.
+   *
+   * @return Whether or not additional batches of work are necessary. False
+   *         means that this fragment is done.
    */
   boolean next();
 
   /**
-   * Inform sender that receiving fragment is finished and doesn't need any more data. This can be called multiple
-   * times (once for each downstream receiver). If all receivers are finished then a subsequent call to {@link #next()}
-   * will return false.
-   * @param handle The handle pointing to the downstream receiver that does not need anymore data.
+   * Inform sender that receiving fragment is finished and doesn't need any more
+   * data. This can be called multiple times (once for each downstream
+   * receiver). If all receivers are finished then a subsequent call to
+   * {@link #next()} will return false.
+   *
+   * @param handle
+   *          The handle pointing to the downstream receiver that does not need
+   *          anymore data.
    */
   void receivingFragmentFinished(FragmentHandle handle);
 
   /**
-   * Dump failed batches' state preceded by its parent's state to logs. Invoked when there is a
-   * failure during fragment execution.
+   * Dump failed batches' state preceded by its parent's state to logs. Invoked
+   * when there is a failure during fragment execution.
+   *
+   * @param t the exception thrown by an operator and which therefore
+   * records, in its stack trace, which operators were active on the stack
    */
-  void dumpBatches();
+  void dumpBatches(Throwable t);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index f464b27..d314794 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -40,6 +41,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
@@ -202,7 +204,7 @@ public class ScanBatch implements CloseableRecordBatch {
    * @return whether we could continue iteration
    * @throws Exception
    */
-  private boolean shouldContinueAfterNoRecords() throws Exception {
+  private boolean shouldContinueAfterNoRecords() {
     logger.trace("scan got 0 record.");
     if (isRepeatableScan) {
       if (!currentReader.hasNext()) {
@@ -212,13 +214,17 @@ public class ScanBatch implements CloseableRecordBatch {
       }
       return true;
     } else { // Regular scan
-      currentReader.close();
-      currentReader = null;
+      closeCurrentReader();
       return true; // In regular case, we always continue the iteration, if no more reader, we will break out at the head of loop
     }
   }
 
-  private IterOutcome internalNext() throws Exception {
+  private void closeCurrentReader() {
+    AutoCloseables.closeSilently(currentReader);
+    currentReader = null;
+  }
+
+  private IterOutcome internalNext() {
     while (true) {
       if (currentReader == null && !getNextReaderIfHas()) {
         logger.trace("currentReader is null");
@@ -281,18 +287,6 @@ public class ScanBatch implements CloseableRecordBatch {
       clearFieldVectorMap();
       lastOutcome = IterOutcome.STOP;
       throw UserException.memoryError(ex).build(logger);
-    } catch (ExecutionSetupException e) {
-      if (currentReader != null) {
-        try {
-          currentReader.close();
-        } catch (final Exception e2) {
-          logger.error("Close failed for reader " + currentReaderClassName, e2);
-        }
-      }
-      lastOutcome = IterOutcome.STOP;
-      throw UserException.internalError(e)
-          .addContext("Setup failed for", currentReaderClassName)
-          .build(logger);
     } catch (UserException ex) {
       lastOutcome = IterOutcome.STOP;
       throw ex;
@@ -309,15 +303,11 @@ public class ScanBatch implements CloseableRecordBatch {
   }
 
   private void clearFieldVectorMap() {
-    for (final ValueVector v : mutator.fieldVectorMap().values()) {
-      v.clear();
-    }
-    for (final ValueVector v : mutator.implicitFieldVectorMap.values()) {
-      v.clear();
-    }
+    VectorAccessibleUtilities.clear(mutator.fieldVectorMap().values());
+    VectorAccessibleUtilities.clear(mutator.implicitFieldVectorMap.values());
   }
 
-  private boolean getNextReaderIfHas() throws ExecutionSetupException {
+  private boolean getNextReaderIfHas() {
     if (!readers.hasNext()) {
       return false;
     }
@@ -326,8 +316,15 @@ public class ScanBatch implements CloseableRecordBatch {
       readers.remove();
     }
     implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null;
-    currentReader.setup(oContext, mutator);
     currentReaderClassName = currentReader.getClass().getSimpleName();
+    try {
+      currentReader.setup(oContext, mutator);
+    } catch (ExecutionSetupException e) {
+      closeCurrentReader();
+      throw UserException.executionError(e)
+          .addContext("Failed to setup reader", currentReaderClassName)
+          .build(logger);
+    }
     return true;
   }
 
@@ -405,7 +402,6 @@ public class ScanBatch implements CloseableRecordBatch {
     return fqn;
   }
 
-
   /**
    * Row set mutator implementation provided to record readers created by
    * this scan batch. Made visible so that tests can create this mutator
@@ -414,7 +410,6 @@ public class ScanBatch implements CloseableRecordBatch {
    * in turn, the only use of the generated vector readers in the vector
    * package.)
    */
-
   @VisibleForTesting
   public static class Mutator implements OutputMutator {
     /** Flag keeping track whether top-level schema has changed since last inquiry (via #isNewSchema}).
@@ -589,9 +584,7 @@ public class ScanBatch implements CloseableRecordBatch {
   public void close() throws Exception {
     container.clear();
     mutator.clear();
-    if (currentReader != null) {
-      currentReader.close();
-    }
+    closeCurrentReader();
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 625bfb3..765e1de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -83,47 +83,47 @@ public class ScreenCreator implements RootCreator<Screen> {
       IterOutcome outcome = next(incoming);
       logger.trace("Screen Outcome {}", outcome);
       switch (outcome) {
-      case STOP:
-        return false;
-      case NONE:
-        if (firstBatch) {
-          // this is the only data message sent to the client and may contain the schema
-          QueryWritableBatch batch;
-          QueryData header = QueryData.newBuilder()
-            .setQueryId(context.getHandle().getQueryId())
-            .setRowCount(0)
-            .setDef(RecordBatchDef.getDefaultInstance())
-            .build();
-          batch = new QueryWritableBatch(header);
+        case STOP:
+          return false;
+        case NONE:
+          if (firstBatch) {
+            // this is the only data message sent to the client and may contain the schema
+            QueryWritableBatch batch;
+            QueryData header = QueryData.newBuilder()
+              .setQueryId(context.getHandle().getQueryId())
+              .setRowCount(0)
+              .setDef(RecordBatchDef.getDefaultInstance())
+              .build();
+            batch = new QueryWritableBatch(header);
+
+            stats.startWait();
+            try {
+              userConnection.sendData(batch);
+            } finally {
+              stats.stopWait();
+            }
+            firstBatch = false; // we don't really need to set this. But who knows!
+          }
 
+          return false;
+        case OK_NEW_SCHEMA:
+          materializer = new VectorRecordMaterializer(context, oContext, incoming);
+          //$FALL-THROUGH$
+        case OK:
+          injector.injectPause(context.getExecutionControls(), "sending-data", logger);
+          final QueryWritableBatch batch = materializer.convertNext();
+          updateStats(batch);
           stats.startWait();
           try {
             userConnection.sendData(batch);
           } finally {
             stats.stopWait();
           }
-          firstBatch = false; // we don't really need to set this. But who knows!
-        }
-
-        return false;
-      case OK_NEW_SCHEMA:
-        materializer = new VectorRecordMaterializer(context, oContext, incoming);
-        //$FALL-THROUGH$
-      case OK:
-        injector.injectPause(context.getExecutionControls(), "sending-data", logger);
-        final QueryWritableBatch batch = materializer.convertNext();
-        updateStats(batch);
-        stats.startWait();
-        try {
-          userConnection.sendData(batch);
-        } finally {
-          stats.stopWait();
-        }
-        firstBatch = false;
-
-        return true;
-      default:
-        throw new UnsupportedOperationException();
+          firstBatch = false;
+
+          return true;
+        default:
+          throw new UnsupportedOperationException(outcome.name());
       }
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/StatisticsWriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/StatisticsWriterRecordBatch.java
index 93aadc6..e54f76d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/StatisticsWriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/StatisticsWriterRecordBatch.java
@@ -19,6 +19,7 @@
 package org.apache.drill.exec.physical.impl;
 
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
@@ -36,18 +37,19 @@ import org.apache.drill.exec.store.StatisticsRecordWriterImpl;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.vector.VarCharVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
 public class StatisticsWriterRecordBatch extends AbstractRecordBatch<Writer> {
-
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StatisticsWriterRecordBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(StatisticsWriterRecordBatch.class);
 
   private StatisticsRecordWriterImpl statsRecordWriterImpl;
   private StatisticsRecordWriter recordWriter;
-  private long counter = 0;
+  private long counter;
   private final RecordBatch incoming;
-  private boolean processed = false;
+  private boolean processed;
   private final String fragmentUniqueId;
   private BatchSchema schema;
 
@@ -91,41 +93,46 @@ public class StatisticsWriterRecordBatch extends AbstractRecordBatch<Writer> {
     }
     // process the complete upstream in one next() call
     IterOutcome upstream;
-    try {
-      do {
-        upstream = next(incoming);
-
-        switch(upstream) {
-          case STOP:
-            return upstream;
-
-          case NOT_YET:
-          case NONE:
-            break;
-
-          case OK_NEW_SCHEMA:
-            setupNewSchema();
-            // $FALL-THROUGH$
-          case OK:
+    do {
+      upstream = next(incoming);
+
+      switch(upstream) {
+        case STOP:
+          return upstream;
+
+        case NOT_YET:
+        case NONE:
+          break;
+
+        case OK_NEW_SCHEMA:
+          setupNewSchema();
+          // $FALL-THROUGH$
+        case OK:
+          try {
             counter += statsRecordWriterImpl.writeStatistics(incoming.getRecordCount());
-            logger.debug("Total records written so far: {}", counter);
-
-            for(final VectorWrapper<?> v : incoming) {
-              v.getValueVector().clear();
-            }
-            break;
-
-          default:
-            throw new UnsupportedOperationException();
-        }
-      } while(upstream != IterOutcome.NONE);
-      // Flush blocking writers now
+          } catch (IOException e) {
+            throw UserException.dataWriteError(e)
+              .addContext("Failure when writing statistics")
+              .build(logger);
+          }
+          logger.debug("Total records written so far: {}", counter);
+
+          for(final VectorWrapper<?> v : incoming) {
+            v.getValueVector().clear();
+          }
+          break;
+
+        default:
+          throw new UnsupportedOperationException();
+      }
+    } while(upstream != IterOutcome.NONE);
+    // Flush blocking writers now
+    try {
       statsRecordWriterImpl.flushBlockingWriter();
-    } catch(IOException ex) {
-      logger.error("Failure during query", ex);
-      kill(false);
-      context.getExecutorState().fail(ex);
-      return IterOutcome.STOP;
+    } catch (IOException ex) {
+      throw UserException.executionError(ex)
+        .addContext("Failure when flushing the block writer")
+        .build(logger);
     }
 
     addOutputContainerData();
@@ -154,7 +161,7 @@ public class StatisticsWriterRecordBatch extends AbstractRecordBatch<Writer> {
     container.setRecordCount(1);
   }
 
-  protected void setupNewSchema() throws IOException {
+  protected void setupNewSchema() {
     try {
       // update the schema in RecordWriter
       stats.startSetup();
@@ -175,33 +182,29 @@ public class StatisticsWriterRecordBatch extends AbstractRecordBatch<Writer> {
       stats.stopSetup();
     }
 
-    statsRecordWriterImpl = new StatisticsRecordWriterImpl(incoming, recordWriter);
+    try {
+      statsRecordWriterImpl = new StatisticsRecordWriterImpl(incoming, recordWriter);
+    } catch (IOException e) {
+      throw UserException.dataWriteError(e)
+            .addContext("Failure when creating the statistics record writer")
+            .build(logger);
+    }
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
     schema = container.getSchema();
   }
 
-  /** Clean up needs to be performed before closing writer. Partially written data will be removed. */
+  /**
+   * Clean up needs to be performed before closing writer. Partially written
+   * data will be removed.
+   */
   private void closeWriter() {
     if (recordWriter == null) {
       return;
     }
 
-    try {
-      //Perform any cleanup prior to closing the writer
-      recordWriter.cleanup();
-    } catch(IOException ex) {
-      context.getExecutorState().fail(ex);
-    } finally {
-      try {
-        if (!processed) {
-          recordWriter.abort();
-        }
-      } catch (IOException e) {
-        logger.error("Abort failed. There could be leftover output files.", e);
-      } finally {
-        recordWriter = null;
-      }
-    }
+    //Perform any cleanup prior to closing the writer
+    recordWriter.cleanup();
+    recordWriter = null;
   }
 
   @Override
@@ -209,5 +212,4 @@ public class StatisticsWriterRecordBatch extends AbstractRecordBatch<Writer> {
     closeWriter();
     super.close();
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
index 3ffa1db..0719f0c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
@@ -27,7 +27,6 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
-import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
 import org.apache.drill.exec.record.MaterializedField;
@@ -37,9 +36,19 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class PriorityQueueTemplate implements PriorityQueue {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueTemplate.class);
+  private static final Logger logger = LoggerFactory.getLogger(PriorityQueueTemplate.class);
+
+  /**
+   * The estimated maximum queue size used with allocating the SV4
+   * for the queue. If the queue is larger, then a) we should probably
+   * be using a sort instead of top N, and b) the code will automatically
+   * grow the SV4 as needed up to the max supported size.
+   */
+  public static final int EST_MAX_QUEUE_SIZE = 4000;
 
   // This holds the min heap of the record indexes. Heapify condition is based on actual record though. Only records
   // meeting the heap condition have their indexes in this heap. Actual record are stored inside the hyperBatch. Since
@@ -54,8 +63,8 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
 
   // Limit determines the number of record to output and hold in queue.
   private int limit;
-  private int queueSize = 0;
-  private int batchCount = 0;
+  private int queueSize;
+  private int batchCount;
   private boolean hasSv2;
 
   @Override
@@ -142,11 +151,7 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
   public void generate() {
     Stopwatch watch = Stopwatch.createStarted();
     final DrillBuf drillBuf = allocator.buffer(4 * queueSize);
-    try {
-      finalSv4 = new SelectionVector4(drillBuf, queueSize, 4000);
-    } catch (SchemaChangeException e) {
-      throw AbstractRecordBatch.schemaChangeException(e, "Priority Queue", logger);
-    }
+    finalSv4 = new SelectionVector4(drillBuf, queueSize, EST_MAX_QUEUE_SIZE);
     for (int i = queueSize - 1; i >= 0; i--) {
       finalSv4.set(i, pop());
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 33b2196..d600f12 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -17,13 +17,13 @@
  */
 package org.apache.drill.exec.physical.impl.TopN;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.calcite.rel.RelFieldCollation.Direction;
 import org.apache.drill.common.DrillAutoCloseables;
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -31,7 +31,6 @@ import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.compile.CodeCompiler;
 import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -79,12 +78,14 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
 
 /**
- * Operator Batch which implements the TopN functionality. It is more efficient than (sort + limit) since unlike sort
- * it doesn't have to store all the input data to sort it first and then apply limit on the sorted data. Instead
- * internally it maintains a priority queue backed by a heap with the size being same as limit value.
+ * Operator Batch which implements the TopN functionality. It is more efficient
+ * than (sort + limit) since unlike sort it doesn't have to store all the input
+ * data to sort it first and then apply limit on the sorted data. Instead
+ * internally it maintains a priority queue backed by a heap with the size being
+ * same as limit value.
  */
 public class TopNBatch extends AbstractRecordBatch<TopN> {
-  static final Logger logger = LoggerFactory.getLogger(TopNBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(TopNBatch.class);
 
   private final MappingSet mainMapping = createMainMappingSet();
   private final MappingSet leftMapping = createLeftMappingSet();
@@ -139,7 +140,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
   }
 
   @Override
-  public void buildSchema() throws SchemaChangeException {
+  public void buildSchema() {
     IterOutcome outcome = next(incoming);
     switch (outcome) {
       case OK:
@@ -184,157 +185,157 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     // Reset the TopN state for next iteration
     resetTopNState();
 
-    try {
-      boolean incomingHasSv2 = false;
-      switch (incoming.getSchema().getSelectionVectorMode()) {
-        case NONE: {
-          break;
-        }
-        case TWO_BYTE: {
-          incomingHasSv2 = true;
-          break;
-        }
-        case FOUR_BYTE: {
-          throw new SchemaChangeException("TopN doesn't support incoming with SV4 mode");
-        }
-        default:
-          throw new UnsupportedOperationException("Unsupported SV mode detected in TopN incoming batch");
+    boolean incomingHasSv2 = false;
+    switch (incoming.getSchema().getSelectionVectorMode()) {
+      case NONE: {
+        break;
+      }
+      case TWO_BYTE: {
+        incomingHasSv2 = true;
+        break;
       }
+      case FOUR_BYTE: {
+        throw UserException.internalError(null)
+          .message("TopN doesn't support incoming with SV4 mode")
+          .build(logger);
+      }
+      default:
+        throw new UnsupportedOperationException("Unsupported SV mode detected in TopN incoming batch");
+    }
 
-      outer: while (true) {
-        Stopwatch watch = Stopwatch.createStarted();
-        if (first) {
-          lastKnownOutcome = IterOutcome.OK_NEW_SCHEMA;
-          // Create the SV4 object upfront to be used for both empty and non-empty incoming batches at EMIT boundary
-          sv4 = new SelectionVector4(context.getAllocator(), 0);
-          first = false;
-        } else {
-          lastKnownOutcome = next(incoming);
-        }
-        if (lastKnownOutcome == OK && schema == null) {
-          lastKnownOutcome = IterOutcome.OK_NEW_SCHEMA;
-          container.clear();
-        }
-        logger.debug("Took {} us to get next", watch.elapsed(TimeUnit.MICROSECONDS));
-        switch (lastKnownOutcome) {
-        case NONE:
-          break outer;
-        case NOT_YET:
-          throw new UnsupportedOperationException();
-        case STOP:
-          return lastKnownOutcome;
-        case OK_NEW_SCHEMA:
-          // only change in the case that the schema truly changes.  Artificial schema changes are ignored.
-          // schema change handling in case when EMIT is also seen is same as without EMIT. i.e. only if union type
-          // is enabled it will be handled.
-          container.clear();
-          firstBatchForSchema = true;
-          if (!incoming.getSchema().equals(schema)) {
-            if (schema != null) {
-              if (!unionTypeEnabled) {
-                throw new UnsupportedOperationException(String.format("TopN currently doesn't support changing " +
-                  "schemas with union type disabled. Please try enabling union type: %s and re-execute the query",
-                  ExecConstants.ENABLE_UNION_TYPE_KEY));
-              } else {
-                this.schema = SchemaUtil.mergeSchemas(this.schema, incoming.getSchema());
-                purgeAndResetPriorityQueue();
-                this.schemaChanged = true;
-              }
+    outer: while (true) {
+      Stopwatch watch = Stopwatch.createStarted();
+      if (first) {
+        lastKnownOutcome = IterOutcome.OK_NEW_SCHEMA;
+        // Create the SV4 object upfront to be used for both empty and non-empty incoming batches at EMIT boundary
+        sv4 = new SelectionVector4(context.getAllocator(), 0);
+        first = false;
+      } else {
+        lastKnownOutcome = next(incoming);
+      }
+      if (lastKnownOutcome == OK && schema == null) {
+        lastKnownOutcome = IterOutcome.OK_NEW_SCHEMA;
+        container.clear();
+      }
+      logger.debug("Took {} us to get next", watch.elapsed(TimeUnit.MICROSECONDS));
+      switch (lastKnownOutcome) {
+      case NONE:
+        break outer;
+      case NOT_YET:
+        throw new UnsupportedOperationException();
+      case STOP:
+        return lastKnownOutcome;
+      case OK_NEW_SCHEMA:
+        // only change in the case that the schema truly changes.  Artificial schema changes are ignored.
+        // schema change handling in case when EMIT is also seen is same as without EMIT. i.e. only if union type
+        // is enabled it will be handled.
+        container.clear();
+        firstBatchForSchema = true;
+        if (!incoming.getSchema().equals(schema)) {
+          if (schema != null) {
+            if (!unionTypeEnabled) {
+              throw new UnsupportedOperationException(String.format("TopN currently doesn't support changing " +
+                "schemas with union type disabled. Please try enabling union type: %s and re-execute the query",
+                ExecConstants.ENABLE_UNION_TYPE_KEY));
             } else {
-              this.schema = incoming.getSchema();
+              schema = SchemaUtil.mergeSchemas(this.schema, incoming.getSchema());
+              purgeAndResetPriorityQueue();
+              schemaChanged = true;
             }
-          }
-          // fall through.
-        case OK:
-        case EMIT:
-          if (incoming.getRecordCount() == 0) {
-            for (VectorWrapper<?> w : incoming) {
-              w.clear();
-            }
-            // Release memory for incoming SV2 vector
-            if (incomingHasSv2) {
-              incoming.getSelectionVector2().clear();
-            }
-            break;
-          }
-          countSincePurge += incoming.getRecordCount();
-          batchCount++;
-          RecordBatchData batch;
-          if (schemaChanged) {
-            batch = new RecordBatchData(SchemaUtil.coerceContainer(incoming, this.schema, oContext), oContext.getAllocator());
           } else {
-            batch = new RecordBatchData(incoming, oContext.getAllocator());
+            schema = incoming.getSchema();
           }
-          boolean success = false;
-          try {
-            if (priorityQueue == null) {
-              priorityQueue = createNewPriorityQueue(new ExpandableHyperContainer(batch.getContainer()), config.getLimit());
-            } else if (!priorityQueue.isInitialized()) {
-              // means priority queue is cleaned up after producing output for first record boundary. We should
-              // initialize it for next record boundary
-              priorityQueue.init(config.getLimit(), oContext.getAllocator(),
-                schema.getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
-            }
-            priorityQueue.add(batch);
-            // Based on static threshold of number of batches, perform purge operation to release the memory for
-            // RecordBatches which are of no use or doesn't fall under TopN category
-            if (countSincePurge > config.getLimit() && batchCount > batchPurgeThreshold) {
-              purge();
-              countSincePurge = 0;
-              batchCount = 0;
-            }
-            success = true;
-          } finally {
-            if (!success) {
-              batch.clear();
-            }
+        }
+        // fall through.
+      case OK:
+      case EMIT:
+        if (incoming.getRecordCount() == 0) {
+          for (VectorWrapper<?> w : incoming) {
+            w.clear();
+          }
+          // Release memory for incoming SV2 vector
+          if (incomingHasSv2) {
+            incoming.getSelectionVector2().clear();
           }
           break;
-        default:
-          throw new UnsupportedOperationException();
         }
-
-        // If the last seen outcome is EMIT then break the loop. We do it here since we want to process the batch
-        // with records and EMIT outcome in above case statements
-        if (lastKnownOutcome == EMIT) {
-          break;
+        countSincePurge += incoming.getRecordCount();
+        batchCount++;
+        RecordBatchData batch;
+        if (schemaChanged) {
+          batch = new RecordBatchData(SchemaUtil.coerceContainer(incoming, this.schema, oContext), oContext.getAllocator());
+        } else {
+          batch = new RecordBatchData(incoming, oContext.getAllocator());
+        }
+        boolean success = false;
+        try {
+          if (priorityQueue == null) {
+            priorityQueue = createNewPriorityQueue(new ExpandableHyperContainer(batch.getContainer()), config.getLimit());
+          } else if (!priorityQueue.isInitialized()) {
+            // means priority queue is cleaned up after producing output for first record boundary. We should
+            // initialize it for next record boundary
+            priorityQueue.init(config.getLimit(), oContext.getAllocator(),
+              schema.getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
+          }
+          priorityQueue.add(batch);
+          // Based on static threshold of number of batches, perform purge operation to release the memory for
+          // RecordBatches which are of no use or doesn't fall under TopN category
+          if (countSincePurge > config.getLimit() && batchCount > batchPurgeThreshold) {
+            purge();
+            countSincePurge = 0;
+            batchCount = 0;
+          }
+          success = true;
+        } catch (SchemaChangeException e) {
+          throw schemaChangeException(e, logger);
+        } finally {
+          if (!success) {
+            batch.clear();
+          }
         }
+        break;
+      default:
+        throw new UnsupportedOperationException();
       }
 
-      // PriorityQueue can be null here if first batch is received with OK_NEW_SCHEMA and is empty and second next()
-      // call returned NONE or EMIT.
-      // PriorityQueue can be uninitialized here if only empty batch is received between 2 EMIT outcome.
-      if (schema == null || (priorityQueue == null || !priorityQueue.isInitialized())) {
-        // builder may be null at this point if the first incoming batch is empty
-        return handleEmptyBatches(lastKnownOutcome);
+      // If the last seen outcome is EMIT then break the loop. We do it here since we want to process the batch
+      // with records and EMIT outcome in above case statements
+      if (lastKnownOutcome == EMIT) {
+        break;
       }
+    }
 
-      priorityQueue.generate();
-      prepareOutputContainer(priorityQueue.getHyperBatch(), priorityQueue.getFinalSv4());
-
-      // With EMIT outcome control will come here multiple times whereas without EMIT outcome control will only come
-      // here once. In EMIT outcome case if there is schema change in any iteration then that will be handled by
-      // lastKnownOutcome.
-      return getFinalOutcome();
-    } catch(SchemaChangeException | ClassTransformationException | IOException ex) {
-      kill(false);
-      logger.error("Failure during query", ex);
-      context.getExecutorState().fail(ex);
-      return IterOutcome.STOP;
+    // PriorityQueue can be null here if first batch is received with OK_NEW_SCHEMA and is empty and second next()
+    // call returned NONE or EMIT.
+    // PriorityQueue can be uninitialized here if only empty batch is received between 2 EMIT outcome.
+    if (schema == null || (priorityQueue == null || !priorityQueue.isInitialized())) {
+      // builder may be null at this point if the first incoming batch is empty
+      return handleEmptyBatches(lastKnownOutcome);
     }
+
+    priorityQueue.generate();
+    prepareOutputContainer(priorityQueue.getHyperBatch(), priorityQueue.getFinalSv4());
+
+    // With EMIT outcome control will come here multiple times whereas without EMIT outcome control will only come
+    // here once. In EMIT outcome case if there is schema change in any iteration then that will be handled by
+    // lastKnownOutcome.
+    return getFinalOutcome();
   }
 
   /**
-   * When PriorityQueue is built up then it stores the list of limit number of record indexes (in heapSv4) which falls
-   * under TopN category. But it also stores all the incoming RecordBatches with all records inside a HyperContainer
-   * (hyperBatch). When a certain threshold of batches are reached then this method is called which copies the limit
-   * number of records whose indexes are stored in heapSv4 out of HyperBatch to a new VectorContainer and releases
-   * all other records and their batches. Later this new VectorContainer is stored inside the HyperBatch and it's
-   * corresponding indexes are stored in the heapSv4 vector. This is done to avoid holding up lot's of Record Batches
-   * which can create OutOfMemory condition.
-   * @throws SchemaChangeException
+   * When PriorityQueue is built up then it stores the list of limit number of
+   * record indexes (in heapSv4) which falls under TopN category. But it also
+   * stores all the incoming RecordBatches with all records inside a
+   * HyperContainer (hyperBatch). When a certain threshold of batches are
+   * reached then this method is called which copies the limit number of records
+   * whose indexes are stored in heapSv4 out of HyperBatch to a new
+   * VectorContainer and releases all other records and their batches. Later
+   * this new VectorContainer is stored inside the HyperBatch and it's
+   * corresponding indexes are stored in the heapSv4 vector. This is done to
+   * avoid holding up lot's of Record Batches which can create OutOfMemory
+   * condition.
    */
-  private void purge() throws SchemaChangeException {
+  private void purge() {
     Stopwatch watch = Stopwatch.createStarted();
     VectorContainer c = priorityQueue.getHyperBatch();
 
@@ -362,7 +363,11 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
       // HyperContainer backing the priority queue out of it
       VectorContainer newQueue = new VectorContainer();
       builder.build(newQueue);
-      priorityQueue.resetQueue(newQueue, builder.getSv4().createNewWrapperCurrent());
+      try {
+        priorityQueue.resetQueue(newQueue, builder.getSv4().createNewWrapperCurrent());
+      } catch (SchemaChangeException e) {
+        throw schemaChangeException(e, logger);
+      }
       builder.getSv4().clear();
     } finally {
       DrillAutoCloseables.closeNoChecked(builder);
@@ -370,8 +375,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     logger.debug("Took {} us to purge", watch.elapsed(TimeUnit.MICROSECONDS));
   }
 
-  private PriorityQueue createNewPriorityQueue(VectorAccessible batch, int limit)
-    throws SchemaChangeException, ClassTransformationException, IOException {
+  private PriorityQueue createNewPriorityQueue(VectorAccessible batch, int limit) {
     return createNewPriorityQueue(
       mainMapping, leftMapping, rightMapping, config.getOrderings(), batch, unionTypeEnabled,
       codegenDump, limit, oContext.getAllocator(), schema.getSelectionVectorMode(), context);
@@ -392,8 +396,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
   public static PriorityQueue createNewPriorityQueue(
     MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping,
     List<Ordering> orderings, VectorAccessible batch, boolean unionTypeEnabled, boolean codegenDump,
-    int limit, BufferAllocator allocator, SelectionVectorMode mode, FragmentContext context)
-          throws ClassTransformationException, IOException, SchemaChangeException {
+    int limit, BufferAllocator allocator, SelectionVectorMode mode, FragmentContext context) {
     OptionSet optionSet = context.getOptions();
     FunctionLookupContext functionLookupContext = context.getFunctionRegistry();
     CodeGenerator<PriorityQueue> cg = CodeGenerator.get(PriorityQueue.TEMPLATE_DEFINITION, optionSet);
@@ -407,11 +410,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     for (Ordering od : orderings) {
       // first, we rewrite the evaluation stack for each side of the comparison.
       ErrorCollector collector = new ErrorCollectorImpl();
-      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(),
-          batch, collector, functionLookupContext, unionTypeEnabled);
-      if (collector.hasErrors()) {
-        throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
-      }
+      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, functionLookupContext, unionTypeEnabled);
+      collector.reportErrors(logger);
       g.setMappingSet(leftMapping);
       HoldingContainer left = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
       g.setMappingSet(rightMapping);
@@ -436,7 +436,11 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     g.getEvalBlock()._return(JExpr.lit(0));
 
     PriorityQueue q = context.getImplementationClass(cg);
-    q.init(limit, allocator, mode == BatchSchema.SelectionVectorMode.TWO_BYTE);
+    try {
+      q.init(limit, allocator, mode == BatchSchema.SelectionVectorMode.TWO_BYTE);
+    } catch (SchemaChangeException e) {
+      throw TopNBatch.schemaChangeException(e, "Top N", logger);
+    }
     return q;
   }
 
@@ -445,9 +449,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
    * 1. Purge existing batches
    * 2. Promote newly created container for new schema.
    * 3. Recreate priority queue and reset with coerced container.
-   * @throws SchemaChangeException
    */
-  public void purgeAndResetPriorityQueue() throws SchemaChangeException, ClassTransformationException, IOException {
+  public void purgeAndResetPriorityQueue() {
     final Stopwatch watch = Stopwatch.createStarted();
     final VectorContainer c = priorityQueue.getHyperBatch();
     final VectorContainer newContainer = new VectorContainer(oContext);
@@ -465,7 +468,11 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
       newSchemaContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);
       priorityQueue.cleanup();
       priorityQueue = createNewPriorityQueue(newSchemaContainer, config.getLimit());
-      priorityQueue.resetQueue(newSchemaContainer, builder.getSv4().createNewWrapperCurrent());
+      try {
+        priorityQueue.resetQueue(newSchemaContainer, builder.getSv4().createNewWrapperCurrent());
+      } catch (SchemaChangeException e) {
+        throw schemaChangeException(e, logger);
+      }
     } finally {
       builder.clear();
       builder.close();
@@ -484,7 +491,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
   }
 
   /**
-   * Resets TopNBatch state to process next incoming batches independent of already seen incoming batches.
+   * Resets TopNBatch state to process next incoming batches independent of
+   * already seen incoming batches.
    */
   private void resetTopNState() {
     lastKnownOutcome = OK;
@@ -548,8 +556,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
    * @param batchBuilder - Builder to build hyper vectors batches
    * @throws SchemaChangeException
    */
-  private void copyToPurge(VectorContainer newContainer, SortRecordBatchBuilder batchBuilder)
-    throws SchemaChangeException {
+  private void copyToPurge(VectorContainer newContainer, SortRecordBatchBuilder batchBuilder) {
     final VectorContainer c = priorityQueue.getHyperBatch();
     final SelectionVector4 queueSv4 = priorityQueue.getSv4();
     final SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 60a6cf6..1b368d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl;
 
 import java.io.IOException;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
@@ -37,25 +38,29 @@ import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.vector.VarCharVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-/* Write the RecordBatch to the given RecordWriter. */
+/** Write the RecordBatch to the given RecordWriter. */
 public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WriterRecordBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(WriterRecordBatch.class);
 
   private EventBasedRecordWriter eventBasedRecordWriter;
   private RecordWriter recordWriter;
-  private long counter = 0;
+  private long counter;
   private final RecordBatch incoming;
-  private boolean processed = false;
+  private boolean processed;
   private final String fragmentUniqueId;
   private BatchSchema schema;
 
-  public WriterRecordBatch(Writer writer, RecordBatch incoming, FragmentContext context, RecordWriter recordWriter) throws OutOfMemoryException {
+  public WriterRecordBatch(Writer writer, RecordBatch incoming, FragmentContext context,
+      RecordWriter recordWriter) throws OutOfMemoryException {
     super(writer, context, false);
     this.incoming = incoming;
 
     final FragmentHandle handle = context.getHandle();
-    fragmentUniqueId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
+    fragmentUniqueId = String.format(
+        "%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
     this.recordWriter = recordWriter;
   }
 
@@ -77,7 +82,6 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
   @Override
   public IterOutcome innerNext() {
     if (processed) {
-//      cleanup();
       // if the upstream record batch is already processed and next() is called by
       // downstream then return NONE to indicate completion
       return IterOutcome.NONE;
@@ -85,46 +89,45 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
 
     // process the complete upstream in one next() call
     IterOutcome upstream;
-    try {
-      do {
-        upstream = next(incoming);
-
-        switch(upstream) {
-          case STOP:
-            return upstream;
-
-          case NOT_YET:
+    do {
+      upstream = next(incoming);
+
+      switch(upstream) {
+        case STOP:
+          return upstream;
+
+        case NOT_YET:
+          break;
+        case NONE:
+          if (schema != null) {
+            // Schema is for the output batch schema which is setup in setupNewSchema(). Since the output
+            // schema is fixed ((Fragment(VARCHAR), Number of records written (BIGINT)) we should set it
+            // up even with 0 records for it to be reported back to the client.
             break;
-          case NONE:
-            if (schema != null) {
-              // Schema is for the output batch schema which is setup in setupNewSchema(). Since the output
-              // schema is fixed ((Fragment(VARCHAR), Number of records written (BIGINT)) we should set it
-              // up even with 0 records for it to be reported back to the client.
-              break;
-            }
-
-          case OK_NEW_SCHEMA:
-            setupNewSchema();
-            // $FALL-THROUGH$
-          case OK:
-            counter += eventBasedRecordWriter.write(incoming.getRecordCount());
-            logger.debug("Total records written so far: {}", counter);
+          }
 
-            for(final VectorWrapper<?> v : incoming) {
-              v.getValueVector().clear();
-            }
-            break;
-
-          default:
-            throw new UnsupportedOperationException();
-        }
-      } while(upstream != IterOutcome.NONE);
-    } catch(IOException ex) {
-      logger.error("Failure during query", ex);
-      kill(false);
-      context.getExecutorState().fail(ex);
-      return IterOutcome.STOP;
-    }
+        case OK_NEW_SCHEMA:
+          setupNewSchema();
+          // $FALL-THROUGH$
+        case OK:
+          try {
+            counter += eventBasedRecordWriter.write(incoming.getRecordCount());
+          } catch (IOException e) {
+            throw UserException.dataWriteError(e)
+              .addContext("Failure when writing the batch")
+              .build(logger);
+          }
+          logger.debug("Total records written so far: {}", counter);
+
+          for(final VectorWrapper<?> v : incoming) {
+            v.getValueVector().clear();
+          }
+          break;
+
+        default:
+          throw new UnsupportedOperationException();
+      }
+    } while(upstream != IterOutcome.NONE);
 
     addOutputContainerData();
     processed = true;
@@ -152,11 +155,17 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
     container.setRecordCount(1);
   }
 
-  protected void setupNewSchema() throws IOException {
+  protected void setupNewSchema() {
     try {
       // update the schema in RecordWriter
       stats.startSetup();
-      recordWriter.updateSchema(incoming);
+      try {
+        recordWriter.updateSchema(incoming);
+      } catch (IOException e) {
+        throw UserException.dataWriteError(e)
+          .addContext("Failure updating record writer schema")
+          .build(logger);
+      }
       // Create two vectors for:
       //   1. Fragment unique id.
       //   2. Summary: currently contains number of records written.
@@ -173,7 +182,13 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
       stats.stopSetup();
     }
 
-    eventBasedRecordWriter = new EventBasedRecordWriter(incoming, recordWriter);
+    try {
+      eventBasedRecordWriter = new EventBasedRecordWriter(incoming, recordWriter);
+    } catch (IOException e) {
+      throw UserException.dataWriteError(e)
+          .addContext("Failed to create the event record writer")
+          .build(logger);
+    }
     container.buildSchema(SelectionVectorMode.NONE);
     schema = container.getSchema();
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index b8f16d8..506b594 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -340,16 +340,12 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       }
 
     case UPDATE_AGGREGATOR:
-      context.getExecutorState().fail(UserException.unsupportedError()
+      throw UserException.unsupportedError()
           .message(SchemaChangeException.schemaChanged(
               "Hash aggregate does not support schema change",
               incomingSchema,
               incoming.getSchema()).getMessage())
-          .build(logger));
-      close();
-      killIncoming(false);
-      firstBatch = false;
-      return IterOutcome.STOP;
+          .build(logger);
     default:
       throw new IllegalStateException(String.format("Unknown state %s.", out));
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordBatch.java
index 33cad10..586d34d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordBatch.java
@@ -39,10 +39,9 @@ import java.io.InputStream;
 import java.util.Iterator;
 
 /**
- * A class to replace "incoming" - instead scanning a spilled partition file
+ * Replaces "incoming" - instead scanning a spilled partition file
  */
 public class SpilledRecordBatch implements CloseableRecordBatch {
-
   private static final Logger logger = LoggerFactory.getLogger(SpilledRecordBatch.class);
 
   private VectorContainer container;
@@ -137,13 +136,13 @@ public class SpilledRecordBatch implements CloseableRecordBatch {
 
     context.getExecutorState().checkContinue();
 
-    if ( spilledBatches <= 0 ) { // no more batches to read in this partition
+    if (spilledBatches <= 0) { // no more batches to read in this partition
       this.close();
       lastOutcome = IterOutcome.NONE;
       return lastOutcome;
     }
 
-    if ( spillStream == null ) {
+    if (spillStream == null) {
       lastOutcome = IterOutcome.STOP;
       throw new IllegalStateException("Spill stream was null");
     }
@@ -153,7 +152,7 @@ public class SpilledRecordBatch implements CloseableRecordBatch {
     }
 
     try {
-      if ( container.getNumberOfColumns() > 0 ) { // container already initialized
+      if (container.getNumberOfColumns() > 0) { // container already initialized
         // Pass our container to the reader because other classes (e.g. HashAggBatch, HashTable)
         // may have a reference to this container (as an "incoming")
         vas.readFromStreamWithContainer(container, spillStream);
@@ -163,11 +162,12 @@ public class SpilledRecordBatch implements CloseableRecordBatch {
         container = vas.get();
       }
     } catch (IOException e) {
-      lastOutcome = IterOutcome.STOP;
-      throw UserException.dataReadError(e).addContext("Failed reading from a spill file").build(logger);
+      throw UserException.dataReadError(e)
+          .addContext("Failed reading from a spill file")
+          .build(logger);
     } catch (Exception e) {
-      lastOutcome = IterOutcome.STOP;
-      throw e;
+      // TODO: Catch the error closer to the cause and create a better error message.
+      throw UserException.executionError(e).build(logger);
     }
 
     spilledBatches--; // one less batch to read
@@ -206,7 +206,7 @@ public class SpilledRecordBatch implements CloseableRecordBatch {
       spillSet.delete(spillFile);
     }
     catch (IOException e) {
-      /* ignore */
+      // ignore
     }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 11261cf..476b316 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -21,7 +21,6 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
 
 import java.util.List;
 
@@ -357,13 +356,10 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
           lastKnownOutcome = EMIT;
           return OK_NEW_SCHEMA;
         } else {
-          context.getExecutorState().fail(UserException.unsupportedError().message(SchemaChangeException
-              .schemaChanged("Streaming aggregate does not support schema changes", incomingSchema,
-                  incoming.getSchema()).getMessage()).build(logger));
-          close();
-          killIncoming(false);
-          lastKnownOutcome = STOP;
-          return IterOutcome.STOP;
+          throw UserException.schemaChangeError(SchemaChangeException.schemaChanged(
+                  "Streaming aggregate does not support schema changes", incomingSchema,
+                  incoming.getSchema()))
+              .build(logger);
         }
       default:
         throw new IllegalStateException(String.format("Unknown state %s.", aggOutcome));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index b03e25f..295cfd5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -132,8 +132,9 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter> {
     if (container.isSchemaChanged()) {
       container.buildSchema(SelectionVectorMode.TWO_BYTE);
       return true;
+    } else {
+      return false;
     }
-    return false;
   }
 
   protected Filterer generateSV4Filterer() throws SchemaChangeException {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterBatchCreator.java
index 65605f2..89449b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterBatchCreator.java
@@ -27,9 +27,11 @@ import org.apache.drill.exec.record.RecordBatch;
 
 import java.util.List;
 
-public class RuntimeFilterBatchCreator implements BatchCreator<RuntimeFilterPOP>{
+public class RuntimeFilterBatchCreator implements BatchCreator<RuntimeFilterPOP> {
   @Override
-  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, RuntimeFilterPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
+                                       RuntimeFilterPOP config, List<RecordBatch> children)
+                                       throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new RuntimeFilterRecordBatch(config, children.iterator().next(), context);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index 28de51f..8b61d19 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
@@ -75,7 +75,8 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
   private final long maxWaitingTime;
   private final long rfIdentifier;
 
-  public RuntimeFilterRecordBatch(RuntimeFilterPOP pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
+  public RuntimeFilterRecordBatch(RuntimeFilterPOP pop, RecordBatch incoming,
+      FragmentContext context) throws OutOfMemoryException {
     super(pop, context, incoming);
     enableRFWaiting = context.getOptions().getBoolean(ExecConstants.HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY);
     maxWaitingTime = context.getOptions().getLong(ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY);
@@ -106,11 +107,7 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
   protected IterOutcome doWork() {
     originalRecordCount = incoming.getRecordCount();
     sv2.setBatchActualRecordCount(originalRecordCount);
-    try {
-      applyRuntimeFilter();
-    } catch (SchemaChangeException e) {
-      throw new UnsupportedOperationException(e);
-    }
+    applyRuntimeFilter();
     container.transferIn(incoming.getContainer());
     container.setRecordCount(originalRecordCount);
     updateStats();
@@ -129,7 +126,7 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
   }
 
   @Override
-  protected boolean setupNewSchema() throws SchemaChangeException {
+  protected boolean setupNewSchema() {
     if (sv2 != null) {
       sv2.clear();
     }
@@ -168,8 +165,9 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
   }
 
   /**
-   * Takes care of setting up HashHelper if RuntimeFilter is received and the HashHelper is not already setup. For each
-   * schema change hash64 should be reset and this method needs to be called again.
+   * Takes care of setting up HashHelper if RuntimeFilter is received and the
+   * HashHelper is not already setup. For each schema change hash64 should be
+   * reset and this method needs to be called again.
    */
   private void setupHashHelper() {
     current = context.getRuntimeFilter(rfIdentifier);
@@ -196,7 +194,9 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
           ValueVectorReadExpression toHashFieldExp = new ValueVectorReadExpression(typedFieldId);
           hashFieldExps.add(toHashFieldExp);
         }
-        hash64 = hashHelper.getHash64(hashFieldExps.toArray(new LogicalExpression[hashFieldExps.size()]), typedFieldIds.toArray(new TypedFieldId[typedFieldIds.size()]));
+        hash64 = hashHelper.getHash64(hashFieldExps.toArray(
+            new LogicalExpression[hashFieldExps.size()]),
+            typedFieldIds.toArray(new TypedFieldId[typedFieldIds.size()]));
       } catch (Exception e) {
         throw UserException.internalError(e).build(logger);
       }
@@ -204,12 +204,12 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
   }
 
   /**
-   * If RuntimeFilter is available then applies the filter condition on the incoming batch records and creates an SV2
-   * to store indexes which passes the filter condition. In case when RuntimeFilter is not available it just pass
+   * If RuntimeFilter is available then applies the filter condition on the
+   * incoming batch records and creates an SV2 to store indexes which passes the
+   * filter condition. In case when RuntimeFilter is not available it just pass
    * through all the records from incoming batch to downstream.
-   * @throws SchemaChangeException
    */
-  private void applyRuntimeFilter() throws SchemaChangeException {
+  private void applyRuntimeFilter() {
     if (originalRecordCount <= 0) {
       sv2.setRecordCount(0);
       return;
@@ -238,7 +238,12 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
       String fieldName = toFilterFields.get(0);
       int fieldId = field2id.get(fieldName);
       for (int rowIndex = 0; rowIndex < originalRecordCount; rowIndex++) {
-        long hash = hash64.hash64Code(rowIndex, 0, fieldId);
+        long hash;
+        try {
+          hash = hash64.hash64Code(rowIndex, 0, fieldId);
+        } catch (SchemaChangeException e) {
+          throw new UnsupportedOperationException(e);
+        }
         boolean contain = bloomFilter.find(hash);
         if (contain) {
           sv2.setIndex(svIndex, rowIndex);
@@ -251,7 +256,11 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
       for (int i = 0; i < toFilterFields.size(); i++) {
         BloomFilter bloomFilter = bloomFilters.get(i);
         String fieldName = toFilterFields.get(i);
-        computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
+        try {
+          computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
+        } catch (SchemaChangeException e) {
+          throw new UnsupportedOperationException(e);
+        }
       }
       for (int i = 0; i < originalRecordCount; i++) {
         boolean contain = bitSet.get(i);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 5edea2c..eeedc72 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -96,7 +96,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This class implements the runtime execution for the Hash-Join operator
+ * Implements the runtime execution for the Hash-Join operator
  * supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
  * <p>
  * This implementation splits the incoming Build side rows into multiple
@@ -125,7 +125,6 @@ import org.slf4j.LoggerFactory;
  * greater) is a waste, indicating that the number of partitions chosen was too
  * small.
  */
-
 public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implements RowKeyJoin {
   private static final Logger logger = LoggerFactory.getLogger(HashJoinBatch.class);
 
@@ -687,12 +686,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
    * @param isLeft is it the left or right
    */
   private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream, boolean isLeft) {
-      batch.kill(true);
-      while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == IterOutcome.OK) {
-        VectorAccessibleUtilities.clear(batch);
-        upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT, batch);
-      }
+    batch.kill(true);
+    while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == IterOutcome.OK) {
+      VectorAccessibleUtilities.clear(batch);
+      upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT, batch);
+    }
   }
+
   private void killAndDrainLeftUpstream() { killAndDrainUpstream(probeBatch, leftUpstream, true); }
   private void killAndDrainRightUpstream() { killAndDrainUpstream(buildBatch, rightUpstream, false); }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 5c9525c..6eeacd8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -27,6 +27,7 @@ import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JMod;
 import com.sun.codemodel.JVar;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -200,8 +201,10 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
         case FAILURE:
           status.left.clearInflightBatches();
           status.right.clearInflightBatches();
-          kill(false);
-          return IterOutcome.STOP;
+          // Should handle at the source of the error to provide a better error message.
+          throw UserException.executionError(null)
+              .message("Merge failed")
+              .build(logger);
         case WAITING:
           return IterOutcome.NOT_YET;
         default:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
index e580dfa..bc8c53b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
@@ -22,7 +22,6 @@ import java.util.List;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.RowKeyJoinPOP;
 import org.apache.drill.exec.record.AbstractRecordBatch;
@@ -36,10 +35,11 @@ import org.apache.drill.exec.vector.ValueVector;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RowKeyJoinBatch extends AbstractRecordBatch<RowKeyJoinPOP> implements RowKeyJoin {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RowKeyJoinBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(RowKeyJoinBatch.class);
 
   // primary table side record batch
   private final RecordBatch left;
@@ -51,7 +51,7 @@ public class RowKeyJoinBatch extends AbstractRecordBatch<RowKeyJoinPOP> implemen
   private IterOutcome leftUpstream = IterOutcome.NONE;
   private IterOutcome rightUpstream = IterOutcome.NONE;
   private final List<TransferPair> transfers = Lists.newArrayList();
-  private int recordCount = 0;
+  private int recordCount;
   private final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
   private RowKeyJoinState rkJoinState = RowKeyJoinState.INITIAL;
 
@@ -82,7 +82,7 @@ public class RowKeyJoinBatch extends AbstractRecordBatch<RowKeyJoinPOP> implemen
   }
 
   @Override
-  protected void buildSchema() throws SchemaChangeException {
+  protected void buildSchema() {
     container.clear();
 
     rightUpstream = next(right);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 0bbaba1..bb5b38d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -21,7 +21,6 @@ import java.util.List;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Limit;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
@@ -114,7 +113,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   }
 
   @Override
-  protected boolean setupNewSchema() throws SchemaChangeException {
+  protected boolean setupNewSchema() {
     container.clear();
     transfers.clear();
 
@@ -139,9 +138,9 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
     if (container.isSchemaChanged()) {
       container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE);
       return true;
+    } else {
+     return false;
     }
-
-    return false;
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
index 48264c6..a466e43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
@@ -22,7 +22,6 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import java.util.List;
 
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.PartitionLimit;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
@@ -84,7 +83,7 @@ public class PartitionLimitRecordBatch extends AbstractSingleRecordBatch<Partiti
   }
 
   @Override
-  protected boolean setupNewSchema() throws SchemaChangeException {
+  protected boolean setupNewSchema() {
     container.clear();
     transfers.clear();
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 8845e7b..ecac4ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -25,7 +25,7 @@ import java.util.List;
 import java.util.PriorityQueue;
 
 import org.apache.calcite.rel.RelFieldCollation.Direction;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -82,7 +82,7 @@ import com.sun.codemodel.JExpr;
 import io.netty.buffer.ByteBuf;
 
 /**
- * The MergingRecordBatch merges pre-sorted record batches from remote senders.
+ * Merges pre-sorted record batches from remote senders.
  */
 public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> implements RecordBatch {
   private static final Logger logger = LoggerFactory.getLogger(MergingRecordBatch.class);
@@ -134,19 +134,27 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     context.getBuffers().getCollector(config.getOppositeMajorFragmentId()).setAllocator(oContext.getAllocator());
   }
 
-  private RawFragmentBatch getNext(final int providerIndex) throws IOException {
+  private RawFragmentBatch getNext(final int providerIndex) {
     stats.startWait();
     final RawFragmentBatchProvider provider = fragProviders[providerIndex];
     try {
       injector.injectInterruptiblePause(context.getExecutionControls(), "waiting-for-data", logger);
-      final RawFragmentBatch b = provider.getNext();
+      RawFragmentBatch b;
+      try {
+        b = provider.getNext();
+      } catch (IOException e) {
+        // TODO: Better to handle inside getNext() to provide a better error message
+        throw UserException.dataReadError(e)
+            .addContext("Failed to read incoming merge batch")
+            .build(logger);
+      }
       if (b != null) {
         stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount());
         stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
         inputCounts[providerIndex] += b.getHeader().getDef().getRecordCount();
       }
       return b;
-    } catch(final InterruptedException e) {
+    } catch (final InterruptedException e) {
       // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
       // interruption and respond to it if it wants to.
       Thread.currentThread().interrupt();
@@ -203,12 +211,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
             rawBatch = tempBatchHolder[p];
             tempBatchHolder[p] = null;
           } else {
-            try {
-              rawBatch = getNext(p);
-            } catch (final IOException e) {
-              context.getExecutorState().fail(e);
-              return IterOutcome.STOP;
-            }
+            rawBatch = getNext(p);
           }
           checkContinue();
 
@@ -231,18 +234,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
             rawBatches.add(rawBatch);
           } else {
             // keep reading till we get a batch with record count > 0 or we have no more batches to read i.e. we get null
-            try {
-              while ((rawBatch = getNext(p)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
-                // Do nothing
-              }
-              if (rawBatch == null) {
-                checkContinue();
-                createDummyBatch = true;
-              }
-            } catch (final IOException e) {
-              context.getExecutorState().fail(e);
-              clearBatches(rawBatches);
-              return IterOutcome.STOP;
+            while ((rawBatch = getNext(p)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
+              // Do nothing
+            }
+            if (rawBatch == null) {
+              checkContinue();
+              createDummyBatch = true;
             }
             if (rawBatch == null || rawBatch.getHeader().getDef().getFieldCount() == 0) {
               createDummyBatch = true;
@@ -307,15 +304,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       for (final RawFragmentBatch batch : incomingBatches) {
         // initialize the incoming batchLoaders
         final UserBitShared.RecordBatchDef rbd = batch.getHeader().getDef();
-        try {
-          batchLoaders[i].load(rbd, batch.getBody());
-          // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
-          // SchemaChangeException, so check/clean catch clause below.
-        } catch(final SchemaChangeException e) {
-          logger.error("MergingReceiver failed to load record batch from remote host.  {}", e);
-          context.getExecutorState().fail(e);
-          return IterOutcome.STOP;
-        }
+        // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
+        batchLoaders[i].load(rbd, batch.getBody());
         batch.release();
         ++batchOffsets[i];
         ++i;
@@ -325,10 +315,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
 
       // Ensure all the incoming batches have the identical schema.
       // Note: RecordBatchLoader permutes the columns to obtain the same columns order for all batches.
-      if (!isSameSchemaAmongBatches(batchLoaders)) {
-        context.getExecutorState().fail(new SchemaChangeException("Incoming batches for merging receiver have different schemas!"));
-        return IterOutcome.STOP;
-      }
+      checkSameSchemaAmongBatches(batchLoaders);
 
       // create the outgoing schema and vector container, and allocate the initial batch
       final SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
@@ -364,19 +351,14 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       // populate the priority queue with initial values
       for (int b = 0; b < senderCount; ++b) {
         while (batchLoaders[b] != null && batchLoaders[b].getRecordCount() == 0) {
-          try {
-            final RawFragmentBatch batch = getNext(b);
-            incomingBatches[b] = batch;
-            if (batch != null) {
-              batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody());
-            } else {
-              batchLoaders[b].clear();
-              batchLoaders[b] = null;
-              checkContinue();
-            }
-          } catch (IOException | SchemaChangeException e) {
-            context.getExecutorState().fail(e);
-            return IterOutcome.STOP;
+          final RawFragmentBatch batch = getNext(b);
+          incomingBatches[b] = batch;
+          if (batch != null) {
+            batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody());
+          } else {
+            batchLoaders[b].clear();
+            batchLoaders[b] = null;
+            checkContinue();
           }
         }
         if (batchLoaders[b] != null) {
@@ -399,21 +381,16 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       if (node.valueIndex == batchLoaders[node.batchId].getRecordCount() - 1) {
         // reached the end of an incoming record batch
         RawFragmentBatch nextBatch;
-        try {
-          nextBatch = getNext(node.batchId);
+        nextBatch = getNext(node.batchId);
 
-          while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
-            nextBatch = getNext(node.batchId);
-          }
+        while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
+          nextBatch = getNext(node.batchId);
+        }
 
-          assert nextBatch != null || inputCounts[node.batchId] == outputCounts[node.batchId]
-              : String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
-          if (nextBatch == null) {
-            checkContinue();
-          }
-        } catch (final IOException e) {
-          context.getExecutorState().fail(e);
-          return IterOutcome.STOP;
+        assert nextBatch != null || inputCounts[node.batchId] == outputCounts[node.batchId]
+            : String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
+        if (nextBatch == null) {
+          checkContinue();
         }
 
         incomingBatches[node.batchId] = nextBatch;
@@ -441,14 +418,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
         }
 
         final UserBitShared.RecordBatchDef rbd = incomingBatches[node.batchId].getHeader().getDef();
-        try {
-          batchLoaders[node.batchId].load(rbd, incomingBatches[node.batchId].getBody());
-          // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
-          // SchemaChangeException, so check/clean catch clause below.
-        } catch(final SchemaChangeException ex) {
-          context.getExecutorState().fail(ex);
-          return IterOutcome.STOP;
-        }
+        // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
+        batchLoaders[node.batchId].load(rbd, incomingBatches[node.batchId].getBody());
         incomingBatches[node.batchId].release();
         batchOffsets[node.batchId] = 0;
 
@@ -457,12 +428,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
           node.valueIndex = 0;
           pqueue.add(node);
         }
-
       } else {
         node.valueIndex++;
         pqueue.add(node);
       }
-
     }
 
     // set the value counts in the outgoing vectors
@@ -527,33 +496,30 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   }
 
   @Override
-  public void buildSchema() throws SchemaChangeException {
+  public void buildSchema() {
     // find frag provider that has data to use to build schema, and put in tempBatchHolder for later use
     tempBatchHolder = new RawFragmentBatch[fragProviders.length];
     int i = 0;
-    try {
-      while (true) {
-        if (i >= fragProviders.length) {
-          state = BatchState.DONE;
-          return;
-        }
-        final RawFragmentBatch batch = getNext(i);
-        if (batch == null) {
-          checkContinue();
-        }
-        if (batch.getHeader().getDef().getFieldCount() == 0) {
-          i++;
-          continue;
-        }
-        tempBatchHolder[i] = batch;
-        for (final SerializedField field : batch.getHeader().getDef().getFieldList()) {
-          final ValueVector v = container.addOrGet(MaterializedField.create(field));
-          v.allocateNew();
-        }
+    while (true) {
+      if (i >= fragProviders.length) {
+        state = BatchState.DONE;
+        return;
+      }
+      final RawFragmentBatch batch = getNext(i);
+      if (batch == null) {
+        checkContinue();
         break;
       }
-    } catch (final IOException e) {
-      throw new DrillRuntimeException(e);
+      if (batch.getHeader().getDef().getFieldCount() == 0) {
+        i++;
+        continue;
+      }
+      tempBatchHolder[i] = batch;
+      for (final SerializedField field : batch.getHeader().getDef().getFieldList()) {
+        final ValueVector v = container.addOrGet(MaterializedField.create(field));
+        v.allocateNew();
+      }
+      break;
     }
     container.buildSchema(SelectionVectorMode.NONE);
     container.setEmpty();
@@ -625,18 +591,22 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     //No op
   }
 
-  private boolean isSameSchemaAmongBatches(final RecordBatchLoader[] batchLoaders) {
+  private void checkSameSchemaAmongBatches(final RecordBatchLoader[] batchLoaders) {
     Preconditions.checkArgument(batchLoaders.length > 0, "0 batch is not allowed!");
 
     final BatchSchema schema = batchLoaders[0].getSchema();
 
     for (int i = 1; i < batchLoaders.length; i++) {
       if (!schema.equals(batchLoaders[i].getSchema())) {
-        logger.error("Schemas are different. Schema 1 : " + schema + ", Schema 2: " + batchLoaders[i].getSchema() );
-        return false;
+        throw UserException.schemaChangeError()
+          .message("Incoming batches for merging receiver have different schemas!")
+          .addContext("Schema 1: %s, Schema 2: %s",
+              schema.toString())
+          .addContext("Schema 2: %s",
+              batchLoaders[i].getSchema().toString())
+          .build(logger);
       }
     }
-    return true;
   }
 
   private void allocateOutgoing() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
index 15b103d..49dc42e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
@@ -29,6 +29,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
@@ -92,10 +93,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Terminal operator for producing ANALYZE statement. This operator is responsible for converting
- * obtained metadata, fetching absent metadata from the Metastore and storing resulting metadata into the Metastore.
+ * Terminal operator for producing ANALYZE statement. This operator is
+ * responsible for converting obtained metadata, fetching absent metadata from
+ * the Metastore and storing resulting metadata into the Metastore.
  * <p>
- * This operator has two inputs: left input contains metadata and right input contains statistics metadata.
+ * This operator has two inputs: left input contains metadata and right input
+ * contains statistics metadata.
  */
 public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataControllerPOP> {
   private static final Logger logger = LoggerFactory.getLogger(MetadataControllerBatch.class);
@@ -183,11 +186,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
           }
           break;
         default:
-          context.getExecutorState()
-              .fail(new UnsupportedOperationException("Unsupported upstream state " + outcome));
-          close();
-          killIncoming(false);
-          return IterOutcome.STOP;
+          throw new UnsupportedOperationException("Unsupported upstream state " + outcome);
       }
     }
 
@@ -217,35 +216,17 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
           //fall through
         case OK:
           assert !firstRight : "First batch should be OK_NEW_SCHEMA";
-          try {
-            appendStatistics(statisticsCollector);
-          } catch (IOException e) {
-            context.getExecutorState().fail(e);
-            close();
-            killIncoming(false);
-            return IterOutcome.STOP;
-          }
+          appendStatistics(statisticsCollector);
           break;
         default:
-          context.getExecutorState()
-              .fail(new UnsupportedOperationException("Unsupported upstream state " + outcome));
-          close();
-          killIncoming(false);
-          return IterOutcome.STOP;
+          throw new UnsupportedOperationException("Unsupported upstream state " + outcome);
       }
     }
     return null;
   }
 
   private IterOutcome handleLeftIncoming() {
-    try {
-      metadataUnits.addAll(getMetadataUnits(left.getContainer()));
-    } catch (Exception e) {
-      context.getExecutorState().fail(e);
-      close();
-      killIncoming(false);
-      return IterOutcome.STOP;
-    }
+    metadataUnits.addAll(getMetadataUnits(left.getContainer()));
     return IterOutcome.OK;
   }
 
@@ -265,11 +246,9 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
     MetastoreTableInfo metastoreTableInfo = popConfig.getContext().metastoreTableInfo();
 
     if (tables.basicRequests().hasMetastoreTableInfoChanged(metastoreTableInfo)) {
-      context.getExecutorState()
-          .fail(new IllegalStateException(String.format("Metadata for table [%s] was changed before analyze is finished", tableInfo.name())));
-      close();
-      killIncoming(false);
-      return IterOutcome.STOP;
+      throw UserException.executionError(null)
+        .message("Metadata for table [%s] was changed before analyze is finished", tableInfo.name())
+        .build(logger);
     }
 
     modify.overwrite(metadataUnits)
@@ -668,7 +647,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
     return metadataStatistics;
   }
 
-  private void appendStatistics(StatisticsRecordCollector statisticsCollector) throws IOException {
+  private void appendStatistics(StatisticsRecordCollector statisticsCollector) {
     if (context.getOptions().getOption(PlannerSettings.STATISTICS_USE)) {
       List<FieldConverter> fieldConverters = new ArrayList<>();
       int fieldId = 0;
@@ -683,16 +662,22 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
         fieldConverters.add(converter);
       }
 
-      for (int counter = 0; counter < right.getRecordCount(); counter++) {
-        statisticsCollector.startStatisticsRecord();
-        // write the current record
-        for (FieldConverter converter : fieldConverters) {
-          converter.setPosition(counter);
-          converter.startField();
-          converter.writeField();
-          converter.endField();
+      try {
+        for (int counter = 0; counter < right.getRecordCount(); counter++) {
+          statisticsCollector.startStatisticsRecord();
+          // write the current record
+          for (FieldConverter converter : fieldConverters) {
+            converter.setPosition(counter);
+            converter.startField();
+            converter.writeField();
+            converter.endField();
+          }
+          statisticsCollector.endStatisticsRecord();
         }
-        statisticsCollector.endStatisticsRecord();
+      } catch (IOException e) {
+        throw UserException.dataWriteError(e)
+            .addContext("Failed to write metadata")
+            .build(logger);
       }
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
index c302ef2..600a170 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
@@ -72,8 +72,8 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
 
 /**
- * Operator responsible for handling metadata returned by incoming aggregate operators and fetching
- * required metadata form the Metastore.
+ * Responsible for handling metadata returned by incoming aggregate operators
+ * and fetching required metadata form the Metastore.
  */
 public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHandlerPOP> {
   private static final Logger logger = LoggerFactory.getLogger(MetadataHandlerBatch.class);
@@ -128,11 +128,7 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
       case STOP:
         return outcome;
       default:
-        context.getExecutorState()
-            .fail(new UnsupportedOperationException("Unsupported upstream state " + outcome));
-        close();
-        killIncoming(false);
-        return IterOutcome.STOP;
+        throw new UnsupportedOperationException("Unsupported upstream state " + outcome);
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index c32cdbf..f52554c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.FieldReference;
@@ -50,7 +51,6 @@ import org.apache.drill.exec.expr.ValueVectorReadExpression;
 import org.apache.drill.exec.expr.ValueVectorWriteExpression;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.QueryCancelledException;
 import org.apache.drill.exec.physical.config.OrderedPartitionSender;
 import org.apache.drill.exec.physical.impl.sort.SortBatch;
 import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
@@ -87,6 +87,8 @@ import com.sun.codemodel.JExpr;
  * value is determined by where each record falls in the partition table. This
  * column is used by PartitionSenderRootExec to determine which bucket to assign
  * each record to.
+ * <p>
+ * This code is not used.
  */
 public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPartitionSender> {
   static final Logger logger = LoggerFactory.getLogger(OrderedPartitionRecordBatch.class);
@@ -267,7 +269,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
       try {
         Thread.sleep(timeout);
       } catch (final InterruptedException e) {
-        throw new QueryCancelledException();
+        checkContinue();
       }
     }
   }
@@ -281,13 +283,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
    * table, and attempts to push the partition table to the distributed cache.
    * Whichever table gets pushed first becomes the table used by all fragments
    * for partitioning.
-   *
-   * @return True is successful. False if failed.
    */
-  private boolean getPartitionVectors() {
-    if (!saveSamples()) {
-      return false;
-    }
+  private void getPartitionVectors() {
+    saveSamples();
 
     CachedVectorContainer finalTable = null;
 
@@ -328,7 +326,6 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
     for (VectorWrapper<?> w : finalTable.get()) {
       partitionVectors.add(w.getValueVector());
     }
-    return true;
   }
 
   private void buildTable() {
@@ -398,17 +395,12 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
   }
 
   /**
-   * Creates a copier that does a project for every Nth record from a VectorContainer incoming into VectorContainer
-   * outgoing. Each Ordering in orderings generates a column, and evaluation of the expression associated with each
-   * Ordering determines the value of each column. These records will later be sorted based on the values in each
-   * column, in the same order as the orderings.
-   *
-   * @param sv4
-   * @param incoming
-   * @param outgoing
-   * @param orderings
-   * @return
-   * @throws SchemaChangeException
+   * Creates a copier that does a project for every Nth record from a
+   * VectorContainer incoming into VectorContainer outgoing. Each Ordering in
+   * orderings generates a column, and evaluation of the expression associated
+   * with each Ordering determines the value of each column. These records will
+   * later be sorted based on the values in each column, in the same order as
+   * the orderings.
    */
   private SampleCopier getCopier(SelectionVector4 sv4, VectorContainer incoming, VectorContainer outgoing,
       List<Ordering> orderings, List<ValueVector> localAllocationVectors) {
@@ -546,7 +538,6 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
    * in the partition table
    *
    * @param batch
-   * @throws SchemaChangeException
    */
   protected void setupNewSchema(VectorAccessible batch) {
     container.clear();
@@ -606,7 +597,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
     try {
       projector.setup(context, batch, this, transfers, partitionVectors, partitions, popConfig.getRef());
     } catch (SchemaChangeException e) {
-      throw schemaChangeException(e, logger);
+      throw UserException.schemaChangeError(e)
+        .addContext("Unexpected schema change in the Ordered Partitioner")
+        .build(logger);
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 18fe9bc..3b7f78a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -47,8 +47,8 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
-import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.vector.CopyUtil;
 
@@ -211,9 +211,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
           incoming.kill(false);
           return false;
         }
-        for (VectorWrapper<?> v : incoming) {
-          v.clear();
-        }
+        VectorAccessibleUtilities.clear(incoming);
         return true;
       case NOT_YET:
       default:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
index 8646192..ec9b550 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
@@ -39,17 +39,19 @@ import org.apache.drill.exec.testing.CountDownLatchInjection;
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Decorator class to hide multiple Partitioner existence from the caller
- * since this class involves multithreaded processing of incoming batches
- * as well as flushing it needs special handling of OperatorStats - stats
- * since stats are not suitable for use in multithreaded environment
- * The algorithm to figure out processing versus wait time is based on following formula:
- * totalWaitTime = totalAllPartitionersProcessingTime - max(sum(processingTime) by partitioner)
+ * Decorator class to hide multiple Partitioner existence from the caller since
+ * this class involves multithreaded processing of incoming batches as well as
+ * flushing it needs special handling of OperatorStats - stats since stats are
+ * not suitable for use in multithreaded environment The algorithm to figure out
+ * processing versus wait time is based on following formula: totalWaitTime =
+ * totalAllPartitionersProcessingTime - max(sum(processingTime) by partitioner)
  */
 public final class PartitionerDecorator {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerDecorator.class);
+  private static final Logger logger = LoggerFactory.getLogger(PartitionerDecorator.class);
   private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(PartitionerDecorator.class);
 
   private final List<Partitioner> partitioners;
@@ -394,7 +396,7 @@ public final class PartitionerDecorator {
     }
 
     public ExecutionException getException() {
-      return this.exception;
+      return exception;
     }
 
     public OperatorStats getStats() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index 9d67b5d..8c8cc54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -38,9 +38,11 @@ import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.ValueVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer> {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProducerConsumerBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(ProducerConsumerBatch.class);
 
   private final RecordBatch incoming;
   private final Thread producer = new Thread(new Producer(), Thread.currentThread().getName() + " - Producer Thread");
@@ -48,7 +50,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
   private final BlockingDeque<RecordBatchDataWrapper> queue;
   private int recordCount;
   private BatchSchema schema;
-  private boolean stop = false;
+  private boolean stop;
   private final CountDownLatch cleanUpLatch = new CountDownLatch(1); // used to wait producer to clean up
 
   protected ProducerConsumerBatch(final ProducerConsumer popConfig, final FragmentContext context, final RecordBatch incoming) throws OutOfMemoryException {
@@ -77,8 +79,6 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
       return IterOutcome.NONE;
     } else if (wrapper.failed) {
       return IterOutcome.STOP;
-    } else if (wrapper.outOfMemory) {
-      throw new OutOfMemoryException();
     }
 
     recordCount = wrapper.batch.getRecordCount();
@@ -143,13 +143,6 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
               throw new UnsupportedOperationException();
           }
         }
-      } catch (final OutOfMemoryException e) {
-        try {
-          queue.putFirst(RecordBatchDataWrapper.outOfMemory());
-        } catch (final InterruptedException ex) {
-          logger.error("Unable to enqueue the last batch indicator. Something is broken.", ex);
-          // TODO InterruptedException
-        }
       } catch (final InterruptedException e) {
         logger.warn("Producer thread is interrupted.", e);
         throw new QueryCancelledException();
@@ -183,6 +176,10 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
   @Override
   protected void killIncoming(final boolean sendUpstream) {
     stop = true;
+  }
+
+  @Override
+  public void close() {
     producer.interrupt();
     try {
       producer.join();
@@ -190,11 +187,6 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
       logger.warn("Interrupted while waiting for producer thread");
       // TODO InterruptedException
     }
-  }
-
-  @Override
-  public void close() {
-    stop = true;
     try {
       cleanUpLatch.await();
     } catch (final InterruptedException e) {
@@ -216,29 +208,23 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
     final RecordBatchData batch;
     final boolean finished;
     final boolean failed;
-    final boolean outOfMemory;
 
-    RecordBatchDataWrapper(final RecordBatchData batch, final boolean finished, final boolean failed, final boolean outOfMemory) {
+    RecordBatchDataWrapper(final RecordBatchData batch, final boolean finished, final boolean failed) {
       this.batch = batch;
       this.finished = finished;
       this.failed = failed;
-      this.outOfMemory = outOfMemory;
     }
 
     public static RecordBatchDataWrapper batch(final RecordBatchData batch) {
-      return new RecordBatchDataWrapper(batch, false, false, false);
+      return new RecordBatchDataWrapper(batch, false, false);
     }
 
     public static RecordBatchDataWrapper finished() {
-      return new RecordBatchDataWrapper(null, true, false, false);
+      return new RecordBatchDataWrapper(null, true, false);
     }
 
     public static RecordBatchDataWrapper failed() {
-      return new RecordBatchDataWrapper(null, false, true, false);
-    }
-
-    public static RecordBatchDataWrapper outOfMemory() {
-      return new RecordBatchDataWrapper(null, false, false, true);
+      return new RecordBatchDataWrapper(null, false, true);
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
index 102bb4c..b3ca591 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
@@ -51,7 +51,6 @@ import org.slf4j.LoggerFactory;
  * batches. The <tt>TransferPair</tt> abstraction fails if different
  * vectors appear across batches.
  */
-
 public class OperatorRecordBatch implements CloseableRecordBatch {
   static final Logger logger = LoggerFactory.getLogger(OperatorRecordBatch.class);
 
@@ -149,10 +148,6 @@ public class OperatorRecordBatch implements CloseableRecordBatch {
       driver.operatorContext().getStats().startProcessing();
       lastOutcome = driver.next();
       return lastOutcome;
-    } catch (Exception e) {
-      // mark batch as failed
-      lastOutcome = IterOutcome.STOP;
-      throw e;
     } finally {
       driver.operatorContext().getStats().stopProcessing();
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
index 7a61489..56c6246 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
@@ -107,7 +107,7 @@ public class RangePartitionRecordBatch extends AbstractSingleRecordBatch<RangePa
    * @return True if the new schema differs from old schema, False otherwise
    */
   @Override
-  protected boolean setupNewSchema() throws SchemaChangeException {
+  protected boolean setupNewSchema() {
     container.clear();
 
     for (VectorWrapper<?> vw : incoming) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index 94770b6..240ee53 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -28,7 +28,6 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.AllocationReservation;
 import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
@@ -158,11 +157,7 @@ public class SortRecordBatchBuilder implements AutoCloseable {
     if (svBuffer == null) {
       throw new OutOfMemoryError("Failed to allocate direct memory for SV4 vector in SortRecordBatchBuilder.");
     }
-    try {
-      sv4 = new SelectionVector4(svBuffer, recordCount, ValueVector.MAX_ROW_COUNT);
-    } catch (SchemaChangeException e) {
-      throw AbstractRecordBatch.schemaChangeException(e, "Sort", logger);
-    }
+    sv4 = new SelectionVector4(svBuffer, recordCount, ValueVector.MAX_ROW_COUNT);
     BatchSchema schema = batches.keySet().iterator().next();
     List<RecordBatchData> data = batches.get(schema);
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
index 104917e..7a88dc9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
@@ -24,14 +24,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
 
-import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.ValueExpressions;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.PhysicalOperatorUtil;
@@ -47,8 +45,6 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.metastore.statistics.Statistic;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Example input and output:
@@ -86,9 +82,7 @@ import org.slf4j.LoggerFactory;
  *   .... another map for next stats function ....
  * </pre>
  */
-
 public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMerge> {
-  private static final Logger logger = LoggerFactory.getLogger(StatisticsMergeBatch.class);
 
   private final Map<String, String> functions;
   private boolean first = true;
@@ -110,8 +104,7 @@ public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMe
    * Creates key columns for the outgoing batch e.g. `schema`, `computed`. These columns are NOT
    * table columns for which statistics will be computed.
    */
-  private void createKeyColumn(String name, LogicalExpression expr)
-      throws SchemaChangeException {
+  private void createKeyColumn(String name, LogicalExpression expr) {
     LogicalExpression mle = PhysicalOperatorUtil.materializeExpression(expr, incoming, context);
     MaterializedField outputField = MaterializedField.create(name, mle.getMajorType());
     ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
@@ -173,11 +166,12 @@ public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMe
     }
   }
 
-  /* Prepare the outgoing container. Generates the outgoing record batch schema.
+  /**
+   * Prepare the outgoing container. Generates the outgoing record batch schema.
    * Please look at the comments above the class definition which describes the
    * incoming/outgoing batch schema
    */
-  private void buildOutputContainer() throws SchemaChangeException {
+  private void buildOutputContainer() {
     // Populate the list of statistics which will be output in the schema
     for (VectorWrapper<?> vw : incoming) {
       for (String outputStatName : functions.keySet()) {
@@ -223,12 +217,13 @@ public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMe
     container.buildSchema(incoming.getSchema().getSelectionVectorMode());
   }
 
-  /* Adds a value vector corresponding to the statistic in the outgoing record batch.
-   * Determines the MajorType based on the incoming value vector. Please look at the
-   * comments above the class definition which describes the incoming/outgoing batch schema
+  /**
+   * Adds a value vector corresponding to the statistic in the outgoing record
+   * batch. Determines the MajorType based on the incoming value vector. Please
+   * look at the comments above the class definition which describes the
+   * incoming/outgoing batch schema
    */
-  private void addVectorToOutgoingContainer(String outStatName, VectorWrapper<?> vw)
-      throws SchemaChangeException {
+  private void addVectorToOutgoingContainer(String outStatName, VectorWrapper<?> vw) {
     // Input map vector
     MapVector inputVector = (MapVector) vw.getValueVector();
     assert inputVector.getPrimitiveVectors().size() > 0;
@@ -265,7 +260,8 @@ public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMe
     }
   }
 
-  /* Prepare the outgoing container. Populates the outgoing record batch data.
+  /**
+   * Prepare the outgoing container. Populates the outgoing record batch data.
    * Please look at the comments above the class definition which describes the
    * incoming/outgoing batch schema
    */
@@ -301,7 +297,7 @@ public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMe
   }
 
   @Override
-  protected boolean setupNewSchema() throws SchemaChangeException {
+  protected boolean setupNewSchema() {
     container.clear();
     // Generate the list of fields for which statistics will be merged
     buildColumnsList();
@@ -340,40 +336,34 @@ public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMe
     if (finished) {
       return IterOutcome.NONE;
     }
-    try {
-      outer: while (true) {
-        outcome = next(incoming);
-        switch (outcome) {
-          case NONE:
-            break outer;
-          case NOT_YET:
-          case STOP:
-            return outcome;
-          case OK_NEW_SCHEMA:
-            if (first) {
-              first = false;
-              if (!setupNewSchema()) {
-                outcome = IterOutcome.OK;
-              }
-              return outcome;
+    outer: while (true) {
+      outcome = next(incoming);
+      switch (outcome) {
+        case NONE:
+          break outer;
+        case NOT_YET:
+        case STOP:
+          return outcome;
+        case OK_NEW_SCHEMA:
+          if (first) {
+            first = false;
+            if (!setupNewSchema()) {
+              outcome = IterOutcome.OK;
             }
-            //fall through
-          case OK:
-            assert first == false : "First batch should be OK_NEW_SCHEMA";
-            IterOutcome out = doWork();
-            didSomeWork = true;
-            if (out != IterOutcome.OK) {
-              return out;
-            }
-            break;
-          default:
-            throw new UnsupportedOperationException("Unsupported upstream state " + outcome);
-        }
+            return outcome;
+          }
+          //fall through
+        case OK:
+          assert first == false : "First batch should be OK_NEW_SCHEMA";
+          IterOutcome out = doWork();
+          didSomeWork = true;
+          if (out != IterOutcome.OK) {
+            return out;
+          }
+          break;
+        default:
+          throw new UnsupportedOperationException("Unsupported upstream state " + outcome);
       }
-    } catch (SchemaChangeException ex) {
-      kill(false);
-      context.getExecutorState().fail(UserException.unsupportedError(ex).build(logger));
-      return IterOutcome.STOP;
     }
 
     // We can only get here if upstream is NONE i.e. no more batches. If we did some work prior to
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index a9584bb..8a18093 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.physical.impl.svremover;
 
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
@@ -45,7 +44,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
   }
 
   @Override
-  protected boolean setupNewSchema() throws SchemaChangeException {
+  protected boolean setupNewSchema() {
     // Don't clear off container just because an OK_NEW_SCHEMA was received from
     // upstream. For cases when there is just
     // change in container type but no actual schema change, RemovingRecordBatch
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 85eceea..65d66ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -194,11 +194,6 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
         stats.batchReceived(0, incoming.getRecordCount(), true);
         memoryManager.update();
         hasRemainder = incoming.getRecordCount() > 0;
-      } catch (SchemaChangeException ex) {
-        kill(false);
-        logger.error("Failure during query", ex);
-        context.getExecutorState().fail(ex);
-        return IterOutcome.STOP;
       } finally {
         stats.stopSetup();
       }
@@ -209,32 +204,25 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
       container.zeroVectors();
       // Check if schema has changed
       if (lateral.getRecordIndex() == 0) {
-        try {
-          boolean hasNewSchema = schemaChanged();
-          stats.batchReceived(0, incoming.getRecordCount(), hasNewSchema);
-          if (hasNewSchema) {
-            setupNewSchema();
-            hasRemainder = true;
-            memoryManager.update();
-            return OK_NEW_SCHEMA;
-          } else { // Unnest field schema didn't changed but new left empty/nonempty batch might come with OK_NEW_SCHEMA
-            // This means even though there is no schema change for unnest field the reference of unnest field
-            // ValueVector must have changed hence we should just refresh the transfer pairs and keep output vector
-            // same as before. In case when new left batch is received with SchemaChange but was empty Lateral will
-            // not call next on unnest and will change it's left outcome to OK. Whereas for non-empty batch next will
-            // be called on unnest by Lateral. Hence UNNEST cannot rely on lateral current outcome to setup transfer
-            // pair. It should do for each new left incoming batch.
-            resetUnnestTransferPair();
-            container.zeroVectors();
-          } // else
-          unnest.resetGroupIndex();
+        boolean hasNewSchema = schemaChanged();
+        stats.batchReceived(0, incoming.getRecordCount(), hasNewSchema);
+        if (hasNewSchema) {
+          setupNewSchema();
+          hasRemainder = true;
           memoryManager.update();
-        } catch (SchemaChangeException ex) {
-          kill(false);
-          logger.error("Failure during query", ex);
-          context.getExecutorState().fail(ex);
-          return IterOutcome.STOP;
-        }
+          return OK_NEW_SCHEMA;
+        } else { // Unnest field schema didn't changed but new left empty/nonempty batch might come with OK_NEW_SCHEMA
+          // This means even though there is no schema change for unnest field the reference of unnest field
+          // ValueVector must have changed hence we should just refresh the transfer pairs and keep output vector
+          // same as before. In case when new left batch is received with SchemaChange but was empty Lateral will
+          // not call next on unnest and will change it's left outcome to OK. Whereas for non-empty batch next will
+          // be called on unnest by Lateral. Hence UNNEST cannot rely on lateral current outcome to setup transfer
+          // pair. It should do for each new left incoming batch.
+          resetUnnestTransferPair();
+          container.zeroVectors();
+        } // else
+        unnest.resetGroupIndex();
+        memoryManager.update();
       }
       return doWork();
     }
@@ -350,20 +338,24 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
     return tp;
   }
 
-  private TransferPair resetUnnestTransferPair() throws SchemaChangeException {
+  private TransferPair resetUnnestTransferPair() {
     List<TransferPair> transfers = Lists.newArrayList();
     FieldReference fieldReference = new FieldReference(popConfig.getColumn());
     TransferPair transferPair = getUnnestFieldTransferPair(fieldReference);
     transfers.add(transferPair);
     logger.debug("Added transfer for unnest expression.");
     unnest.close();
-    unnest.setup(context, incoming, this, transfers);
+    try {
+      unnest.setup(context, incoming, this, transfers);
+    } catch (SchemaChangeException e) {
+      throw schemaChangeException(e, logger);
+    }
     setUnnestVector();
     return transferPair;
   }
 
   @Override
-  protected boolean setupNewSchema() throws SchemaChangeException {
+  protected boolean setupNewSchema() {
     Preconditions.checkNotNull(lateral);
     container.clear();
     MaterializedField rowIdField = MaterializedField.create(rowIdColumnName, Types.required(TypeProtos
@@ -380,13 +372,13 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
   }
 
   /**
-   * Compares the schema of the unnest column in the current incoming with the schema of
-   * the unnest column in the previous incoming.
-   * Also saves the schema for comparison in future iterations
+   * Compares the schema of the unnest column in the current incoming with the
+   * schema of the unnest column in the previous incoming. Also saves the schema
+   * for comparison in future iterations
    *
    * @return true if the schema has changed, false otherwise
    */
-  private boolean schemaChanged() throws SchemaChangeException {
+  private boolean schemaChanged() {
     unnestTypedFieldId = checkAndGetUnnestFieldId();
     MaterializedField thisField = incoming.getSchema().getColumn(unnestTypedFieldId.getFieldIds()[0]);
     MaterializedField prevField = unnestFieldMetadata;
@@ -430,12 +422,14 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
       memoryManager.getAvgOutputRowWidth(), memoryManager.getTotalOutputRecords());
   }
 
-  private TypedFieldId checkAndGetUnnestFieldId() throws SchemaChangeException {
+  private TypedFieldId checkAndGetUnnestFieldId() {
     TypedFieldId fieldId = incoming.getValueVectorId(popConfig.getColumn());
     if (fieldId == null) {
-      throw new SchemaChangeException(String.format("Unnest column %s not found inside the incoming record batch. " +
-          "This may happen if a wrong Unnest column name is used in the query. Please rerun query after fixing that.",
-        popConfig.getColumn()));
+      throw UserException.schemaChangeError(null)
+          .message(String.format("Unnest column %s not found inside the incoming record batch. " +
+              "This may happen if a wrong Unnest column name is used in the query. Please rerun query after fixing that.",
+              popConfig.getColumn()))
+          .build(logger);
     }
 
     return fieldId;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index f6304aa..b94c551 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -22,9 +22,9 @@ import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.ExchangeFragmentContext;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
@@ -150,7 +150,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
     return batchLoader.getValueAccessorById(clazz, ids);
   }
 
-  private RawFragmentBatch getNextBatch() throws IOException {
+  private RawFragmentBatch getNextBatch() {
     try {
       injector.injectInterruptiblePause(context.getExecutionControls(), "waiting-for-data", logger);
       return fragProvider.getNext();
@@ -161,6 +161,10 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
       Thread.currentThread().interrupt();
 
       return null;
+    } catch (IOException e) {
+      throw UserException.dataReadError(e)
+          .addContext("Failure when reading incoming batch")
+          .build(logger);
     }
   }
 
@@ -215,13 +219,6 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
         lastOutcome = IterOutcome.OK;
       }
       return lastOutcome;
-    } catch (SchemaChangeException | IOException ex) {
-      context.getExecutorState().fail(ex);
-      lastOutcome = IterOutcome.STOP;
-      return lastOutcome;
-    } catch (Exception e) {
-      lastOutcome = IterOutcome.STOP;
-      throw e;
     } finally {
       stats.stopProcessing();
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
index 3562e9a..66e8b01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
@@ -23,7 +23,6 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.UnpivotMaps;
@@ -116,38 +115,19 @@ public class UnpivotMapsRecordBatch extends AbstractSingleRecordBatch<UnpivotMap
       case STOP:
         return upStream;
       case OK_NEW_SCHEMA:
-        if (first) {
-          first = false;
-        }
-        try {
-          if (!setupNewSchema()) {
-            upStream = IterOutcome.OK;
-          } else {
-            return upStream;
-          }
-        } catch (SchemaChangeException ex) {
-          kill(false);
-          logger.error("Failure during query", ex);
-          context.getExecutorState().fail(ex);
-          return IterOutcome.STOP;
-        }
-        //fall through
+        first = false;
+        setupNewSchema();
+        return upStream;
+
       case OK:
         assert first == false : "First batch should be OK_NEW_SCHEMA";
-        try {
-          container.zeroVectors();
-          IterOutcome out = doWork();
-          // Preserve OK_NEW_SCHEMA unless doWork() runs into an issue
-          if (out != IterOutcome.OK) {
-            upStream = out;
-          }
-        } catch (Exception ex) {
-          kill(false);
-          logger.error("Failure during query", ex);
-          context.getExecutorState().fail(ex);
-          return IterOutcome.STOP;
+        container.zeroVectors();
+        IterOutcome out = doWork();
+        // Preserve OK_NEW_SCHEMA unless doWork() runs into an issue
+        if (out != IterOutcome.OK) {
+          upStream = out;
         }
-       return upStream;
+        return upStream;
       default:
         throw new UnsupportedOperationException("Unsupported upstream state " + upStream);
     }
@@ -269,7 +249,7 @@ public class UnpivotMapsRecordBatch extends AbstractSingleRecordBatch<UnpivotMap
   }
 
   @Override
-  protected boolean setupNewSchema() throws SchemaChangeException {
+  protected boolean setupNewSchema() {
     container.clear();
     buildKeyList();
     buildOutputContainer();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
index 5288776..624b14e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
@@ -21,6 +21,7 @@ import org.apache.drill.common.exceptions.DrillException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.WindowPOP;
+import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -34,9 +35,10 @@ import java.util.List;
 
 
 /**
- * WindowFramer implementation that supports the FRAME clause.
- * <br>According to the SQL specification, FIRST_VALUE, LAST_VALUE and all aggregate functions support the FRAME clause.
- * This class will handle such functions even if the FRAME clause is not present.
+ * WindowFramer implementation that supports the FRAME clause. <br>
+ * According to the SQL specification, FIRST_VALUE, LAST_VALUE and all aggregate
+ * functions support the FRAME clause. This class will handle such functions
+ * even if the FRAME clause is not present.
  */
 public abstract class FrameSupportTemplate implements WindowFramer {
 
@@ -90,7 +92,7 @@ public abstract class FrameSupportTemplate implements WindowFramer {
    * processes all rows of the first batch.
    */
   @Override
-  public void doWork() throws DrillException {
+  public void doWork() throws SchemaChangeException {
     int currentRow = 0;
 
     this.current = batches.get(0);
@@ -144,7 +146,7 @@ public abstract class FrameSupportTemplate implements WindowFramer {
    * @return index of next unprocessed row
    * @throws DrillException if it can't write into the container
    */
-  private int processPartition(final int currentRow) throws DrillException {
+  private int processPartition(final int currentRow) throws SchemaChangeException {
     logger.trace("{} rows remaining to process, currentRow: {}, outputCount: {}", remainingRows, currentRow, outputCount);
 
     setupWriteFirstValue(internal, container);
@@ -156,7 +158,7 @@ public abstract class FrameSupportTemplate implements WindowFramer {
     }
   }
 
-  private int processROWS(int row) throws DrillException {
+  private int processROWS(int row) throws SchemaChangeException {
     //TODO (DRILL-4413) we only need to call these once per batch
     setupEvaluatePeer(current, container);
     setupReadLastValue(current, container);
@@ -175,7 +177,7 @@ public abstract class FrameSupportTemplate implements WindowFramer {
     return row;
   }
 
-  private int processRANGE(int row) throws DrillException {
+  private int processRANGE(int row) throws SchemaChangeException {
     while (row < outputCount && !isPartitionDone()) {
       if (remainingPeers == 0) {
         // because all peer rows share the same frame, we only need to compute and aggregate the frame once
@@ -199,8 +201,10 @@ public abstract class FrameSupportTemplate implements WindowFramer {
   }
 
   /**
-   * updates partition's length after computing the number of rows for the current the partition starting at the specified
-   * row of the first batch. If !requiresFullPartition, this method will only count the rows in the current batch
+   * Updates partition's length after computing the number of rows for the
+   * current the partition starting at the specified row of the first batch. If
+   * !requiresFullPartition, this method will only count the rows in the current
+   * batch
    */
   private void updatePartitionSize(final int start) {
     logger.trace("compute partition size starting from {} on {} batches", start, batches.size());
@@ -245,12 +249,11 @@ public abstract class FrameSupportTemplate implements WindowFramer {
   }
 
   /**
-   * aggregates all peer rows of current row
+   * Aggregates all peer rows of current row
    * @param start starting row of the current frame
    * @return num peer rows for current row
-   * @throws SchemaChangeException
    */
-  private long aggregatePeers(final int start) throws SchemaChangeException {
+  private long aggregatePeers(final int start) {
     logger.trace("aggregating rows starting from {}", start);
 
     final boolean unboundedFollowing = popConfig.getEnd().isUnbounded();
@@ -260,7 +263,11 @@ public abstract class FrameSupportTemplate implements WindowFramer {
     // a single frame can include rows from multiple batches
     // start processing first batch and, if necessary, move to next batches
     for (WindowDataBatch batch : batches) {
-      setupEvaluatePeer(batch, container);
+      try {
+        setupEvaluatePeer(batch, container);
+      } catch (SchemaChangeException e) {
+        throw AbstractRecordBatch.schemaChangeException(e, "Window", logger);
+      }
       final int recordCount = batch.getRecordCount();
 
       // for every remaining row in the partition, count it if it's a peer row
@@ -281,7 +288,11 @@ public abstract class FrameSupportTemplate implements WindowFramer {
       }
     }
 
-    setupReadLastValue(last, container);
+    try {
+      setupReadLastValue(last, container);
+    } catch (SchemaChangeException e) {
+      throw AbstractRecordBatch.schemaChangeException(e, "Window", logger);
+    }
 
     return length;
   }
@@ -354,6 +365,7 @@ public abstract class FrameSupportTemplate implements WindowFramer {
    * @param b2 batch for second row
    * @return true if the rows are in the same partition
    */
+  @Override
   public abstract boolean isSamePartition(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
                                           @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
 
@@ -366,6 +378,7 @@ public abstract class FrameSupportTemplate implements WindowFramer {
    * @param b2 batch for second row
    * @return true if the rows are in the same partition
    */
+  @Override
   public abstract boolean isPeer(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
                                  @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
index a759399..61c8070 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
@@ -17,10 +17,10 @@
  */
 package org.apache.drill.exec.physical.impl.window;
 
-import org.apache.drill.common.exceptions.DrillException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.WindowPOP;
+import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -80,10 +80,10 @@ public abstract class NoFrameSupportTemplate implements WindowFramer {
   }
 
   /**
-   * processes all rows of the first batch.
+   * Processes all rows of the first batch.
    */
   @Override
-  public void doWork() throws DrillException {
+  public void doWork() {
     int currentRow = 0;
     current = batches.get(0);
     outputCount = current.getRecordCount();
@@ -103,17 +103,25 @@ public abstract class NoFrameSupportTemplate implements WindowFramer {
         newPartition(current, currentRow);
       }
 
-      currentRow = processPartition(currentRow);
+      try {
+        currentRow = processPartition(currentRow);
+      } catch (SchemaChangeException e) {
+        throw AbstractRecordBatch.schemaChangeException(e, "Window", logger);
+      }
       if (partition.isDone()) {
         cleanPartition();
       }
     }
   }
 
-  private void newPartition(WindowDataBatch current, int currentRow) throws SchemaChangeException {
+  private void newPartition(WindowDataBatch current, int currentRow) {
     partition = new Partition();
     updatePartitionSize(partition, currentRow);
-    setupPartition(current, container);
+    try {
+      setupPartition(current, container);
+    } catch (SchemaChangeException e) {
+      throw AbstractRecordBatch.schemaChangeException(e, "Window", logger);
+    }
   }
 
   private void cleanPartition() {
@@ -138,10 +146,9 @@ public abstract class NoFrameSupportTemplate implements WindowFramer {
    * @param currentRow
    *          first unprocessed row
    * @return index of next unprocessed row
-   * @throws DrillException
-   *           if it can't write into the container
+   * @throws SchemaChangeException
    */
-  private int processPartition(int currentRow) throws DrillException {
+  private int processPartition(int currentRow) throws SchemaChangeException {
     logger.trace("process partition {}, currentRow: {}, outputCount: {}", partition, currentRow, outputCount);
 
     setupCopyNext(current, container);
@@ -202,7 +209,7 @@ public abstract class NoFrameSupportTemplate implements WindowFramer {
     }
   }
 
-  private void processRow(int row) throws DrillException {
+  private void processRow(int row) throws SchemaChangeException {
     if (partition.isFrameDone()) {
       // because all peer rows share the same frame, we only need to compute and aggregate the frame once
       long peers = countPeers(row);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index 496c776..ba8c06b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -17,12 +17,10 @@
  */
 package org.apache.drill.exec.physical.impl.window;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.drill.common.exceptions.DrillException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -31,7 +29,6 @@ import org.apache.drill.common.logical.data.Order;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -145,13 +142,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
     }
 
     // process first saved batch, then release it
-    try {
-      doWork();
-    } catch (DrillException e) {
-      context.getExecutorState().fail(e);
-      cleanup();
-      return IterOutcome.STOP;
-    }
+    doWork();
 
     if (state == BatchState.FIRST) {
       state = BatchState.NOT_FIRST;
@@ -160,7 +151,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
     return IterOutcome.OK;
   }
 
-  private void doWork() throws DrillException {
+  private void doWork() {
 
     WindowDataBatch current = batches.get(0);
     int recordCount = current.getRecordCount();
@@ -170,8 +161,12 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
     // allocate outgoing vectors
     container.allocateNew();
 
-    for (WindowFramer framer : framers) {
-      framer.doWork();
+    try {
+      for (WindowFramer framer : framers) {
+        framer.doWork();
+      }
+    } catch (SchemaChangeException e) {
+      throw schemaChangeException(e, logger);
     }
 
     // transfer "non aggregated" vectors
@@ -225,7 +220,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
   }
 
   @Override
-  protected void buildSchema() throws SchemaChangeException {
+  protected void buildSchema() {
     logger.trace("buildSchema()");
     IterOutcome outcome = next(incoming);
     switch (outcome) {
@@ -242,8 +237,8 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
 
     try {
       createFramers(incoming);
-    } catch (IOException | ClassTransformationException e) {
-      throw new SchemaChangeException("Exception when creating the schema", e);
+    } catch (SchemaChangeException e) {
+      throw schemaChangeException(e, logger);
     }
 
     if (incoming.getRecordCount() > 0) {
@@ -251,7 +246,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
     }
   }
 
-  private void createFramers(VectorAccessible batch) throws SchemaChangeException, IOException, ClassTransformationException {
+  private void createFramers(VectorAccessible batch) throws SchemaChangeException {
     assert framers == null : "createFramer should only be called once";
 
     logger.trace("creating framer(s)");
@@ -321,7 +316,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
   }
 
   private WindowFramer generateFramer(List<LogicalExpression> keyExprs, List<LogicalExpression> orderExprs,
-      List<WindowFunction> functions, boolean useCustomFrame) throws IOException, ClassTransformationException {
+      List<WindowFunction> functions, boolean useCustomFrame) {
 
     TemplateClassDefinition<WindowFramer> definition = useCustomFrame ?
       WindowFramer.FRAME_TEMPLATE_DEFINITION : WindowFramer.NOFRAME_TEMPLATE_DEFINITION;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
index 4bb3d38..7552a10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
@@ -40,9 +40,10 @@ public interface WindowFramer {
 
   /**
    * process the inner batch and write the aggregated values in the container
+   * @throws SchemaChangeException
    * @throws DrillException
    */
-  void doWork() throws DrillException;
+  void doWork() throws SchemaChangeException;
 
   /**
    * @return number rows processed in last batch
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index f05b9f2..3a54c5f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.record;
 
 import java.util.Iterator;
 
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
@@ -178,12 +177,6 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
           break;
       }
       return lastOutcome;
-    } catch (SchemaChangeException e) {
-      lastOutcome = IterOutcome.STOP;
-      throw new DrillRuntimeException(e);
-    } catch (Exception e) {
-      lastOutcome = IterOutcome.STOP;
-      throw e;
     } finally {
       stats.stopProcessing();
     }
@@ -200,8 +193,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     }
   }
 
-  protected void buildSchema() throws SchemaChangeException {
-  }
+  protected void buildSchema() { }
 
   @Override
   public void kill(boolean sendUpstream) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
index d9b5f4b..8341b95 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
@@ -24,16 +24,19 @@ import org.apache.drill.exec.physical.base.LateralContract;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 
 /**
- * Implements AbstractUnaryRecodBatch for operators that do not have an incoming record batch available at creation
- * time; the input is typically set up a few steps after creation. Table functions and operators like Unnest that
- * require input before they can produce output fall into this category.
- * Table functions can be associated with a Lateral operator in which case they simultaneously operate on the
- * same row as the Lateral operator. In this case the LateralContract member is not null and the table function uses the
- * lateral contract to keep in sync with the Lateral operator.
+ * Implements AbstractUnaryRecodBatch for operators that do not have an incoming
+ * record batch available at creation time; the input is typically set up a few
+ * steps after creation. Table functions and operators like Unnest that require
+ * input before they can produce output fall into this category. Table functions
+ * can be associated with a Lateral operator in which case they simultaneously
+ * operate on the same row as the Lateral operator. In this case the
+ * LateralContract member is not null and the table function uses the lateral
+ * contract to keep in sync with the Lateral operator.
+ *
  * @param <T>
  */
 public abstract class AbstractTableFunctionRecordBatch<T extends PhysicalOperator> extends
-    AbstractUnaryRecordBatch<T> implements TableFunctionContract{
+    AbstractUnaryRecordBatch<T> implements TableFunctionContract {
 
   protected RecordBatch incoming;
   protected LateralContract lateral;
@@ -48,12 +51,14 @@ public abstract class AbstractTableFunctionRecordBatch<T extends PhysicalOperato
     return incoming;
   }
 
+  @Override
   public void setIncoming(RecordBatch incoming) {
     Preconditions.checkArgument(this.incoming == null, "Incoming is already set. setIncoming cannot be called "
         + "multiple times.");
     this.incoming = incoming;
   }
 
+  @Override
   public void setIncoming(LateralContract incoming) {
     setIncoming(incoming.getIncoming());
     lateral = incoming;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
index 78acb10..5997a34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
@@ -18,16 +18,13 @@
 package org.apache.drill.exec.record;
 
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * The base class for operators that have a single input. The concrete implementations provide the
+ * Base class for operators that have a single input. The concrete implementations provide the
  * input by implementing the getIncoming() method
  * Known implementations:  AbstractSingleRecordBatch and AbstractTableFunctionRecordBatch.
  * @see org.apache.drill.exec.record.AbstractRecordBatch
@@ -36,7 +33,6 @@ import org.slf4j.LoggerFactory;
  * @param <T>
  */
 public abstract class AbstractUnaryRecordBatch<T extends PhysicalOperator> extends AbstractRecordBatch<T> {
-  private static final Logger logger = LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass());
 
   protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
   private IterOutcome lastKnownOutcome;
@@ -98,11 +94,6 @@ public abstract class AbstractUnaryRecordBatch<T extends PhysicalOperator> exten
           if (!setupNewSchema()) {
             upstream = IterOutcome.OK;
           }
-        } catch (SchemaChangeException ex) {
-          kill(false);
-          logger.error("Failure during query", ex);
-          context.getExecutorState().fail(ex);
-          return IterOutcome.STOP;
         } finally {
           stats.stopSetup();
         }
@@ -130,7 +121,7 @@ public abstract class AbstractUnaryRecordBatch<T extends PhysicalOperator> exten
     }
   }
 
-  protected abstract boolean setupNewSchema() throws SchemaChangeException;
+  protected abstract boolean setupNewSchema();
   protected abstract IterOutcome doWork();
 
   /**
@@ -150,7 +141,7 @@ public abstract class AbstractUnaryRecordBatch<T extends PhysicalOperator> exten
    */
   protected IterOutcome handleNullInput() {
     container.buildSchema(SelectionVectorMode.NONE);
-    container.setRecordCount(0);
+    container.setEmpty();
     return IterOutcome.NONE;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 1f06710..064c601 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -75,7 +75,8 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
    * @throws SchemaChangeException
    *   TODO:  Clean:  DRILL-2933  load(...) never actually throws SchemaChangeException.
    */
-  public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeException {
+  @SuppressWarnings("resource")
+  public boolean load(RecordBatchDef def, DrillBuf buf) {
     if (logger.isTraceEnabled()) {
       logger.trace("Loading record batch with def {} and data {}", def, buf);
       logger.trace("Load, ThreadID: {}\n{}", Thread.currentThread().getId(), new StackTrace());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
index c588f25..09200f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
@@ -31,8 +31,8 @@ public class SimpleRecordBatch implements RecordBatch {
 
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleRecordBatch.class);
 
-  private VectorContainer container;
-  private FragmentContext context;
+  private final VectorContainer container;
+  private final FragmentContext context;
 
   public SimpleRecordBatch(VectorContainer container, FragmentContext context) {
     this.container = container;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleUtilities.java
index 12b9053..f8fcb3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleUtilities.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.record;
 
 import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
 
 /**
  * VectorAccessible is an interface. Yet, several operations are done
@@ -37,6 +38,12 @@ public class VectorAccessibleUtilities {
     }
   }
 
+  public static void clear(Iterable<ValueVector> iter) {
+    for (final ValueVector v : iter) {
+      v.clear();
+    }
+  }
+
   public static void setValueCount(VectorAccessible va, int count) {
     for (VectorWrapper<?> w: va) {
       w.getValueVector().getMutator().setValueCount(count);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index 4f4f88d..ffc2854 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -19,21 +19,27 @@ package org.apache.drill.exec.record.selection;
 
 import io.netty.buffer.ByteBuf;
 
-import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.DeadBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SelectionVector4 implements AutoCloseable {
+  static final Logger logger = LoggerFactory.getLogger(SelectionVector4.class);
 
   private ByteBuf data;
   private int recordCount;
   private int start;
   private int length;
 
-  public SelectionVector4(ByteBuf vector, int recordCount, int batchRecordCount) throws SchemaChangeException {
+  public SelectionVector4(ByteBuf vector, int recordCount, int batchRecordCount) {
     if (recordCount > Integer.MAX_VALUE / 4) {
-      throw new SchemaChangeException(String.format("Currently, Drill can only support allocations up to 2gb in size. " +
-          "You requested an allocation of %d bytes.", recordCount * 4L));
+      throw UserException.internalError(null)
+          .message(String.format(
+            "Currently, Drill can only support allocations up to 2gb in size. " +
+            "Query requested an allocation of %d bytes.", recordCount * 4L))
+          .build(logger);
     }
     this.recordCount = recordCount;
     this.start = 0;
@@ -43,8 +49,11 @@ public class SelectionVector4 implements AutoCloseable {
 
   public SelectionVector4(BufferAllocator allocator, int recordCount) {
     if (recordCount > Integer.MAX_VALUE / 4) {
-      throw new IllegalStateException(String.format("Currently, Drill can only support allocations up to 2gb in size. " +
-          "You requested an allocation of %d bytes.", recordCount * 4L));
+      throw UserException.internalError(null)
+          .message(String.format(
+              "Currently, Drill can only support allocations up to 2gb in size. " +
+              "Query requested an allocation of %d bytes.", recordCount * 4L))
+         .build(logger);
     }
     this.recordCount = recordCount;
     this.start = 0;
@@ -82,23 +91,24 @@ public class SelectionVector4 implements AutoCloseable {
   }
 
   /**
-   * Caution: This method shares the underlying buffer between this vector and the newly created one.
-   * @param batchRecordCount this will be used when creating the new vector
+   * Caution: This method shares the underlying buffer between this vector and
+   * the newly created one.
+   *
+   * @param batchRecordCount
+   *          this will be used when creating the new vector
    * @return Newly created single batch SelectionVector4.
    */
   public SelectionVector4 createNewWrapperCurrent(int batchRecordCount) {
-    try {
-      data.retain();
-      final SelectionVector4 sv4 = new SelectionVector4(data, recordCount, batchRecordCount);
-      sv4.start = this.start;
-      return sv4;
-    } catch (SchemaChangeException e) {
-      throw new IllegalStateException("This shouldn't happen.");
-    }
+    data.retain();
+    final SelectionVector4 sv4 = new SelectionVector4(data, recordCount, batchRecordCount);
+    sv4.start = this.start;
+    return sv4;
   }
 
   /**
-   * Caution: This method shares the underlying buffer between this vector and the newly created one.
+   * Caution: This method shares the underlying buffer between this vector and
+   * the newly created one.
+   *
    * @return Newly created single batch SelectionVector4.
    */
   public SelectionVector4 createNewWrapperCurrent() {
@@ -106,19 +116,15 @@ public class SelectionVector4 implements AutoCloseable {
   }
 
   public boolean next() {
-//    logger.debug("Next called. Start: {}, Length: {}, recordCount: " + recordCount, start, length);
-
     if (!hasNext()) {
       start = recordCount;
       length = 0;
-//      logger.debug("Setting count to zero.");
       return false;
     }
 
     start = start + length;
     int newEnd = Math.min(start + length, recordCount);
     length = newEnd - start;
-//    logger.debug("New start {}, new length {}", start, length);
     return true;
   }
 
@@ -142,8 +148,8 @@ public class SelectionVector4 implements AutoCloseable {
     this.recordCount = fromSV4.getTotalCount();
     this.length = fromSV4.getCount();
     this.data = fromSV4.getData();
-    // Need to retain the data buffer since if fromSV4 clears out the buffer it's not actually released unless the
-    // copied SV4 has also released it
+    // Need to retain the data buffer since if fromSV4 clears out the buffer
+    // it's not actually released unless the copied SV4 has also released it
     if (data != DeadBuf.DEAD_BUFFER) {
       this.data.retain();
     }
diff --git a/exec/java-exec/src/main/codegen/templates/StatisticsRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StatisticsRecordWriter.java
similarity index 74%
rename from exec/java-exec/src/main/codegen/templates/StatisticsRecordWriter.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/StatisticsRecordWriter.java
index 9c285d4..ee0bc2d 100644
--- a/exec/java-exec/src/main/codegen/templates/StatisticsRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StatisticsRecordWriter.java
@@ -15,24 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-<@pp.dropOutputFile />
-<@pp.changeOutputFile name="org/apache/drill/exec/store/StatisticsRecordWriter.java" />
-<#include "/@includes/license.ftl" />
-
 package org.apache.drill.exec.store;
 
 import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
-import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
 import java.io.IOException;
 import java.util.Map;
 
-/*
- * This class is generated using freemarker and the ${.template_name} template.
- */
-
-/** StatisticsRecordWriter interface. */
 public interface StatisticsRecordWriter extends StatisticsRecordCollector {
 
   /**
@@ -41,14 +30,14 @@ public interface StatisticsRecordWriter extends StatisticsRecordCollector {
    * @param writerOptions Contains key, value pair of settings.
    * @throws IOException
    */
-  void init(Map<String, String> writerOptions) throws IOException;
+  void init(Map<String, String> writerOptions);
 
   /**
    * Update the schema in RecordWriter. Called at least once before starting writing the records.
    * @param batch
    * @throws IOException
    */
-  void updateSchema(VectorAccessible batch) throws IOException;
+  void updateSchema(VectorAccessible batch);
 
   /**
    * Check if the writer should start a new partition, and if so, start a new partition
@@ -66,6 +55,6 @@ public interface StatisticsRecordWriter extends StatisticsRecordCollector {
    * @throws IOException
    */
   void flushBlockingWriter() throws IOException;
-  void abort() throws IOException;
-  void cleanup() throws IOException;
-}
\ No newline at end of file
+  void abort();
+  void cleanup();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
index 4a3d431..b73fa76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
@@ -48,10 +48,8 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
 
   private Path cleanUpLocation;
   private String location;
-  private boolean append;
   private String prefix;
 
-  private String fieldDelimiter;
   private String extension;
   private boolean useExtendedOutput;
 
@@ -62,10 +60,7 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
   private final JsonFactory factory = new JsonFactory();
   private final StorageStrategy storageStrategy;
 
-  // Record write status
-  private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
-
-  private Configuration fsConf;
+  private final Configuration fsConf;
 
   public JsonRecordWriter(StorageStrategy storageStrategy, Configuration fsConf) {
     this.storageStrategy = storageStrategy == null ? StorageStrategy.DEFAULT : storageStrategy;
@@ -76,7 +71,6 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
   public void init(Map<String, String> writerOptions) throws IOException {
     this.location = writerOptions.get("location");
     this.prefix = writerOptions.get("prefix");
-    this.fieldDelimiter = writerOptions.get("separator");
     this.extension = writerOptions.get("extension");
     this.useExtendedOutput = Boolean.parseBoolean(writerOptions.get("extended"));
     this.skipNullFields = Boolean.parseBoolean(writerOptions.get("skipnulls"));
@@ -244,13 +238,11 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
   @Override
   public void startRecord() throws IOException {
     gen.writeStartObject();
-    fRecordStarted = true;
   }
 
   @Override
   public void endRecord() throws IOException {
     gen.writeEndObject();
-    fRecordStarted = false;
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonStatisticsRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonStatisticsRecordWriter.java
index 51cdbe2..8858e34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonStatisticsRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonStatisticsRecordWriter.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
 import org.apache.drill.exec.store.StatisticsRecordWriter;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.planner.common.DrillStatsTable;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.store.JSONBaseStatisticsRecordWriter;
@@ -40,13 +41,13 @@ public class JsonStatisticsRecordWriter extends JSONBaseStatisticsRecordWriter i
   private String prefix;
   private String extension;
   private FileSystem fs = null;
-  private Configuration fsConf;
-  private FormatPlugin formatPlugin;
+  private final Configuration fsConf;
+  private final FormatPlugin formatPlugin;
   private Path fileName = null;
 
   private long recordsWritten = -1;
 
-  private StatisticsCollectorImpl statisticsCollector = new StatisticsCollectorImpl();
+  private final StatisticsCollectorImpl statisticsCollector = new StatisticsCollectorImpl();
 
   public JsonStatisticsRecordWriter(Configuration fsConf, FormatPlugin formatPlugin) {
     this.fsConf = fsConf;
@@ -54,7 +55,7 @@ public class JsonStatisticsRecordWriter extends JSONBaseStatisticsRecordWriter i
   }
 
   @Override
-  public void init(Map<String, String> writerOptions) throws IOException {
+  public void init(Map<String, String> writerOptions) {
     this.location = writerOptions.get("location");
     this.prefix = writerOptions.get("prefix");
     this.extension = writerOptions.get("extension");
@@ -70,8 +71,9 @@ public class JsonStatisticsRecordWriter extends JSONBaseStatisticsRecordWriter i
         fs.delete(fileName, false);
       }
     } catch (IOException ex) {
-      logger.error("Unable to delete tmp file (corrupt): " + fileName, ex);
-      throw ex;
+      throw UserException.dataWriteError(ex)
+        .addContext("Unable to delete tmp statistics file", fileName)
+        .build(logger);
     }
     try {
       // Delete the tmp file and .stats.drill on exit. After writing out the permanent file
@@ -81,8 +83,9 @@ public class JsonStatisticsRecordWriter extends JSONBaseStatisticsRecordWriter i
       fs.deleteOnExit(new Path(location));
       logger.debug("Created file: {}", fileName);
     } catch (IOException ex) {
-      logger.error("Unable to create file: " + fileName, ex);
-      throw ex;
+      throw UserException.dataWriteError(ex)
+        .addContext("Unable to create stistics file", fileName)
+        .build(logger);
     }
   }
 
@@ -186,13 +189,13 @@ public class JsonStatisticsRecordWriter extends JSONBaseStatisticsRecordWriter i
   }
 
   @Override
-  public void abort() throws IOException {
+  public void abort() {
     // Invoke cleanup to clear any .tmp files and/or empty statistics directory
     cleanup();
   }
 
   @Override
-  public void cleanup() throws IOException {
+  public void cleanup() {
     Path permFileName = new Path(location, prefix + "." + extension);
     try {
       // Remove the .tmp file, if any
@@ -206,8 +209,8 @@ public class JsonStatisticsRecordWriter extends JSONBaseStatisticsRecordWriter i
         logger.debug("Deleted directory: {}", location);
       }
     } catch (IOException ex) {
-      logger.error("Unable to delete tmp file: " + fileName, ex);
-      throw ex;
+      // Warn but continue
+      logger.warn("Unable to delete tmp satistics file: " + fileName, ex);
     }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 161fa83..3b9ed1f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -55,9 +55,7 @@ import org.slf4j.LoggerFactory;
 import static org.apache.drill.exec.server.FailureUtils.EXIT_CODE_HEAP_OOM;
 
 /**
- * <h2>Overview</h2>
- * <p>
- * Responsible for running a single fragment on a single Drillbit.
+ * Runs a single fragment on a single Drillbit.
  * Listens/responds to status request and cancellation messages.
  * </p>
  * <h2>Theory of Operation</h2>
@@ -143,6 +141,7 @@ public class FragmentExecutor implements Runnable {
 
   private volatile RootExec root;
   private final AtomicReference<FragmentState> fragmentState = new AtomicReference<>(FragmentState.AWAITING_ALLOCATION);
+
   /**
    * Holds all of the messages sent by downstream receivers that have finished. The {@link FragmentExecutor#run()} thread reads from this queue and passes the
    * finished messages to the fragment's {@link RootExec} via the {@link RootExec#receivingFragmentFinished(FragmentHandle)} method.
@@ -209,11 +208,11 @@ public class FragmentExecutor implements Runnable {
      * before this check. This caused a concurrent modification exception as the list of operator
      * stats is iterated over while collecting info, and added to while building the operator tree.
      */
-    if (fragmentState.get() != FragmentState.RUNNING) {
+    if (fragmentState.get() == FragmentState.RUNNING) {
+      return statusReporter.getStatus(FragmentState.RUNNING);
+    } else {
       return null;
     }
-
-    return statusReporter.getStatus(FragmentState.RUNNING);
   }
 
   /**
@@ -237,32 +236,31 @@ public class FragmentExecutor implements Runnable {
   }
 
   private void cleanup(FragmentState state) {
-    if (root != null && fragmentState.get() == FragmentState.FAILED) {
-      root.dumpBatches();
-    }
-
     closeOutResources();
 
     updateState(state);
     // send the final state of the fragment. only the main execution thread can send the final state and it can
     // only be sent once.
     sendFinalState();
-
   }
 
   /**
-   * Resume all the pauses within the current context. Note that this method will be called from threads *other* than
-   * the one running this runnable(). Also, this method can be called multiple times.
+   * Resume all the pauses within the current context. Note that this method
+   * will be called from threads *other* than the one running this runnable().
+   * Also, this method can be called multiple times.
    */
   public synchronized void unpause() {
     fragmentContext.getExecutionControls().unpauseAll();
   }
 
   /**
-   * Inform this fragment that one of its downstream partners no longer needs additional records. This is most commonly
-   * called in the case that a limit query is executed.
+   * Inform this fragment that one of its downstream partners no longer needs
+   * additional records. This is most commonly called in the case that a limit
+   * query is executed.
    *
-   * @param handle The downstream FragmentHandle of the Fragment that needs no more records from this Fragment.
+   * @param handle
+   *          The downstream FragmentHandle of the Fragment that needs no more
+   *          records from this Fragment.
    */
   public void receivingFragmentFinished(final FragmentHandle handle) {
     eventProcessor.receiverFinished(handle);
@@ -283,7 +281,6 @@ public class FragmentExecutor implements Runnable {
     final String newThreadName = QueryIdHelper.getExecutorThreadName(fragmentHandle);
 
     try {
-
       myThread.setName(newThreadName);
 
       // if we didn't get the root operator when the executor was created, create it now.
@@ -337,6 +334,7 @@ public class FragmentExecutor implements Runnable {
       // Ignore: indicates query cancelled by this executor
     } catch (OutOfMemoryError | OutOfMemoryException e) {
       if (FailureUtils.isDirectMemoryOOM(e)) {
+        root.dumpBatches(e);
         fail(UserException.memoryError(e).build(logger));
       } else {
         // we have a heap out of memory error. The JVM is unstable, exit.
@@ -346,6 +344,7 @@ public class FragmentExecutor implements Runnable {
       // Swallow interrupted exceptions since we intentionally interrupt the root when cancelling a query
       logger.trace("Interrupted root: {}", root, e);
     } catch (Throwable t) {
+      root.dumpBatches(t);
       fail(t);
     } finally {
 
@@ -363,7 +362,6 @@ public class FragmentExecutor implements Runnable {
       clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);
 
       myThread.setName(originalThreadName);
-
     }
   }
 
@@ -400,7 +398,6 @@ public class FragmentExecutor implements Runnable {
     statusReporter.close();
   }
 
-
   private void closeOutResources() {
 
     // first close the operators and release all memory.
@@ -416,7 +413,6 @@ public class FragmentExecutor implements Runnable {
 
     // then close the fragment context.
     fragmentContext.close();
-
   }
 
   private void warnStateChange(final FragmentState current, final FragmentState target) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 2fc9158..054cdee 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -126,7 +126,7 @@ public class MockRecordBatch implements CloseableRecordBatch {
   @Override
   public void close() {
     container.clear();
-    container.setRecordCount(0);
+    container.setEmpty();
     currentContainerIndex = 0;
     currentOutcomeIndex = 0;
     if (sv2 != null) {
@@ -291,8 +291,7 @@ public class MockRecordBatch implements CloseableRecordBatch {
   }
 
   @Override
-  public void dump() {
-  }
+  public void dump() { }
 
   public static class Builder {
     private final List<RowSet> rowSets = new ArrayList<>();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index 68cf18f..8f0d677 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -113,8 +113,8 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector> {
   }
 
   @Override
-  public void dumpBatches() {
-    screenRoot.dumpBatches();
+  public void dumpBatches(Throwable t) {
+    screenRoot.dumpBatches(t);
   }
 
   @Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestStackAnalyzer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestStackAnalyzer.java
new file mode 100644
index 0000000..fd8f2ac
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestStackAnalyzer.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Test;
+
+/**
+ * Test the function which finds the leaf-most operator within
+ * an exception call stack. Does the tests using dummy classes
+ * (which is why the stack analyzer function is parameterized.)
+ */
+public class TestStackAnalyzer {
+
+  private static class OperA {
+    public void throwNow() {
+      throw new RuntimeException();
+    }
+
+    public void throwIndirect() {
+      throwNow();
+    }
+
+    public void throwViaB(OperB b) {
+      b.throwIndirect();
+    }
+
+    public void throwAfterB(OperB b) {
+      new RandomC().throwAfterB(b);
+    }
+  }
+
+  private static class OperB {
+    public void throwNow() {
+      throw new RuntimeException();
+    }
+
+    public void throwIndirect() {
+      throwNow();
+    }
+
+    public void throwAfterB() {
+      new RandomC().throwNow();
+    }
+  }
+
+  private static class RandomC {
+    public void throwNow() {
+      throw new RuntimeException();
+    }
+
+    public void throwAfterB(OperB b) {
+      b.throwAfterB();
+    }
+  }
+
+  @Test
+  public void testEmptyStack() {
+    try {
+      throw new RuntimeException();
+    } catch (RuntimeException e) {
+      assertNull(BaseRootExec.findLeaf(Collections.emptyList(), e));
+    }
+  }
+
+  @Test
+  public void testOneLevel() {
+    OperA a = new OperA();
+    try {
+      a.throwNow();
+    } catch (RuntimeException e) {
+      List<Object> ops = Collections.singletonList(a);
+      assertSame(a, BaseRootExec.findLeaf(ops, e));
+    }
+  }
+
+  @Test
+  public void testOneLevelTwoDeep() {
+    OperA a = new OperA();
+    try {
+      a.throwIndirect();
+    } catch (RuntimeException e) {
+      List<Object> ops = Collections.singletonList(a);
+      assertSame(a, BaseRootExec.findLeaf(ops, e));
+    }
+  }
+
+  @Test
+  public void testTwoLevels() {
+    OperA a = new OperA();
+    OperB b = new OperB();
+    try {
+      a.throwViaB(b);
+    } catch (RuntimeException e) {
+      List<Object> ops = Arrays.asList(a, b);
+      assertSame(b, BaseRootExec.findLeaf(ops, e));
+    }
+  }
+
+  @Test
+  public void testTwoLevelsWithExtra() {
+    OperA a = new OperA();
+    OperB b = new OperB();
+    try {
+      a.throwAfterB(b);
+    } catch (RuntimeException e) {
+      List<Object> ops = Arrays.asList(a, b);
+      assertSame(b, BaseRootExec.findLeaf(ops, e));
+    }
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
index 7ef94f7..9e47915 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
@@ -140,7 +140,7 @@ public class TestPartitionSender extends PlanTestBase {
     final SelectionVector4 sv = Mockito.mock(SelectionVector4.class, "SelectionVector4");
     Mockito.when(sv.getCount()).thenReturn(100);
     Mockito.when(sv.getTotalCount()).thenReturn(100);
-    for (int i = 0; i < 100; i++ ) {
+    for (int i = 0; i < 100; i++) {
       Mockito.when(sv.get(i)).thenReturn(i);
     }
 
@@ -165,8 +165,8 @@ public class TestPartitionSender extends PlanTestBase {
 
     // get HashToRandomExchange physical operator
     HashToRandomExchange hashToRandomExchange = null;
-    for ( PhysicalOperator operator : operators) {
-      if ( operator instanceof HashToRandomExchange) {
+    for (PhysicalOperator operator : operators) {
+      if (operator instanceof HashToRandomExchange) {
         hashToRandomExchange = (HashToRandomExchange) operator;
         break;
       }
@@ -241,7 +241,7 @@ public class TestPartitionSender extends PlanTestBase {
         final int actualThreads = DRILLBITS_COUNT > expectedThreadsCount ? expectedThreadsCount : DRILLBITS_COUNT;
         assertEquals("Number of partitioners", actualThreads, partitioners.size());
 
-        for ( int i = 0; i < mfEndPoints.size(); i++) {
+        for (int i = 0; i < mfEndPoints.size(); i++) {
           assertNotNull("PartitionOutgoingBatch", partDecor.getOutgoingBatches(i));
         }
 
@@ -249,10 +249,11 @@ public class TestPartitionSender extends PlanTestBase {
         boolean isFirst = true;
         int prevBatchCountSize = 0;
         int batchCountSize = 0;
-        for (Partitioner part : partitioners ) {
+        for (Partitioner part : partitioners) {
+          @SuppressWarnings("unchecked")
           final List<PartitionOutgoingBatch> outBatch = (List<PartitionOutgoingBatch>) part.getOutgoingBatches();
           batchCountSize = outBatch.size();
-          if ( !isFirst ) {
+          if (!isFirst) {
             assertTrue(Math.abs(batchCountSize - prevBatchCountSize) <= 1);
           } else {
             isFirst = false;
@@ -266,7 +267,7 @@ public class TestPartitionSender extends PlanTestBase {
         } finally {
           partionSenderRootExec.getStats().stopProcessing();
         }
-        if ( actualThreads == 1 ) {
+        if (actualThreads == 1) {
           assertEquals("With single thread parent and child waitNanos should match", partitioners.get(0).getStats().getWaitNanos(), partionSenderRootExec.getStats().getWaitNanos());
         }
 
@@ -274,11 +275,12 @@ public class TestPartitionSender extends PlanTestBase {
         partitioners = partDecor.getPartitioners();
         isFirst = true;
         // since we have fake Nullvector distribution is skewed
-        for (Partitioner part : partitioners ) {
+        for (Partitioner part : partitioners) {
+          @SuppressWarnings("unchecked")
           final List<PartitionOutgoingBatch> outBatches = (List<PartitionOutgoingBatch>) part.getOutgoingBatches();
-          for (PartitionOutgoingBatch partOutBatch : outBatches ) {
+          for (PartitionOutgoingBatch partOutBatch : outBatches) {
             final int recordCount = ((VectorAccessible) partOutBatch).getRecordCount();
-            if ( isFirst ) {
+            if (isFirst) {
               assertEquals("RecordCount", 100, recordCount);
               isFirst = false;
             } else {
@@ -296,8 +298,8 @@ public class TestPartitionSender extends PlanTestBase {
           final OperatorProfile.Builder oPBuilder = OperatorProfile.newBuilder();
           partionSenderRootExec.getStats().addAllMetrics(oPBuilder);
           final List<MetricValue> metrics = oPBuilder.getMetricList();
-          for ( MetricValue metric : metrics) {
-            if ( Metric.BYTES_SENT.metricId() == metric.getMetricId() ) {
+          for (MetricValue metric : metrics) {
+            if (Metric.BYTES_SENT.metricId() == metric.getMetricId()) {
               assertEquals("Should add metricValue irrespective of exception", 5*actualThreads, metric.getLongValue());
             }
             if (Metric.SENDING_THREADS_COUNT.metricId() == metric.getMetricId()) {
@@ -327,7 +329,7 @@ public class TestPartitionSender extends PlanTestBase {
     int numberPartitions;
     int k = 0;
     final Random rand = new Random();
-    while ( k < 1000 ) {
+    while (k < 1000) {
       outGoingBatchCount = rand.nextInt(1000)+1;
       numberPartitions = rand.nextInt(32)+1;
       final int actualPartitions = outGoingBatchCount > numberPartitions ? numberPartitions : outGoingBatchCount;
@@ -339,11 +341,11 @@ public class TestPartitionSender extends PlanTestBase {
       for (int i = 0; i < actualPartitions; i++) {
         startIndex = endIndex;
         endIndex = startIndex + divisor;
-        if ( i < longTail ) {
+        if (i < longTail) {
           endIndex++;
         }
       }
-      assertTrue("endIndex can not be > outGoingBatchCount", endIndex == outGoingBatchCount );
+      assertTrue("endIndex can not be > outGoingBatchCount", endIndex == outGoingBatchCount);
       k++;
     }
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index b3e1d8e..0baf0a0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -62,14 +62,12 @@ import static org.junit.Assert.fail;
 @Category(OperatorTest.class)
 public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
 
-
   // Operator Context for mock batch
   public static OperatorContext operatorContext;
 
   public static PhysicalOperator mockPopConfig;
   public static LateralJoinPOP ljPopConfig;
 
-
   @BeforeClass public static void setUpBeforeClass() throws Exception {
     mockPopConfig = new MockStorePOP(null);
     ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
@@ -112,7 +110,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
-
   }
 
   @Test
@@ -146,7 +143,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
-
   }
 
   @Test
@@ -167,7 +163,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
-
   }
 
   @Test
@@ -198,7 +193,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
-
   }
 
   @Test
@@ -246,7 +240,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
-
   }
 
   @Test
@@ -295,7 +288,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
-
   }
 
   private void testUnnestBatchSizing(int inputBatchSize, int limitOutputBatchSize,
@@ -437,7 +429,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     } finally {
       fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, outputBatchSize);
     }
-
   }
 
   @Test
@@ -495,7 +486,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     } finally {
       fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, outputBatchSize);
     }
-
   }
 
   @Test
@@ -528,10 +518,8 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
-
   }
 
-
   // test unnest for various input conditions without invoking kill
   private <T> void testUnnest(
       TupleMetadata[] incomingSchemas,
@@ -559,7 +547,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     ArrayList<RowSet.SingleRowSet> rowSets = new ArrayList<>();
     int rowNumber = 0;
     int batchNum = 0;
-    for ( Object[] recordBatch : data) {
+    for (Object[] recordBatch : data) {
       RowSetBuilder rowSetBuilder = fixture.rowSetBuilder(incomingSchemas[batchNum]);
       for ( Object rowData : recordBatch) {
         rowSetBuilder.addRow(++rowNumber, rowData);
@@ -604,28 +592,29 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
 
     // Simulate the pipeline by calling next on the incoming
 
-    // results is an array ot batches, each batch being an array of output vectors.
+    // results is an array of batches, each batch being an array of output vectors.
     List<List<ValueVector> > resultList = new ArrayList<>();
     List<List<ValueVector> > results = null;
     int batchesProcessed = 0;
     try{
-    try {
-      while (!isTerminal(lateralJoinBatch.next())) {
-        if (lateralJoinBatch.getRecordCount() > 0) {
-          addBatchToResults(resultList, lateralJoinBatch);
-        }
-        batchesProcessed++;
-        if (batchesProcessed == execKill) {
-          lateralJoinBatch.getContext().getExecutorState().fail(new DrillException("Testing failure of execution."));
-          lateralJoinBatch.kill(true);
+      try {
+        while (!isTerminal(lateralJoinBatch.next())) {
+          if (lateralJoinBatch.getRecordCount() > 0) {
+            addBatchToResults(resultList, lateralJoinBatch);
+          }
+          batchesProcessed++;
+          if (batchesProcessed == execKill) {
+            // Errors are reported by throwing an exception.
+            // Simulate by skipping out of the loop
+            break;
+          }
+          // else nothing to do
         }
-        // else nothing to do
+      } catch (UserException e) {
+        throw e;
+      } catch (Exception e) {
+        fail(e.getMessage());
       }
-    } catch (UserException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new Exception ("Test failed to execute lateralJoinBatch.next() because: " + e.getMessage());
-    }
 
       // Check results against baseline
       results = resultList;
@@ -633,7 +622,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
       int batchIndex = 0;
       int vectorIndex = 0;
       //int valueIndex = 0;
-      for ( List<ValueVector> batch: results) {
+      for (List<ValueVector> batch: results) {
         int vectorCount= batch.size();
         int expectedVectorCount = (excludeUnnestColumn) ? 0 : 1;
         expectedVectorCount += baseline[batchIndex].length;
@@ -698,7 +687,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
         rowSet.clear();
       }
     }
-
   }
 
   /**
@@ -849,8 +837,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
    * @param <T>
    * @throws Exception
    */
-
-
   private <T> void testNestedUnnest( TupleMetadata[] incomingSchemas,
       RecordBatch.IterOutcome[] iterOutcomes,
       int execKill, // number of batches after which to kill the execution (!)
@@ -1016,7 +1002,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
         rowSet.clear();
       }
     }
-
   }
 
   @Test
@@ -1037,8 +1022,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
-
   }
-
 }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 83627c6..206221d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -47,7 +47,6 @@ import org.apache.drill.exec.ZookeeperHelper;
 import org.apache.drill.exec.ZookeeperTestUtil;
 import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.RootAllocatorFactory;
 import org.apache.drill.exec.physical.impl.ScreenCreator;
@@ -249,13 +248,9 @@ public class TestDrillbitResilience extends DrillTest {
           public void rowArrived(final QueryDataBatch queryResultBatch) {
             // load the single record
             final QueryData queryData = queryResultBatch.getHeader();
-            try {
-              loader.load(queryData.getDef(), queryResultBatch.getData());
-              // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
-              // SchemaChangeException, so check/clean catch clause below.
-            } catch (final SchemaChangeException e) {
-              fail(e.toString());
-            }
+            // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
+            // SchemaChangeException.
+            loader.load(queryData.getDef(), queryResultBatch.getData());
             assertEquals(1, loader.getRecordCount());
 
             // there should only be one column
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
index 6d9be9b..c31807b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
 
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
@@ -48,7 +47,7 @@ public class ParquetResultListener implements UserResultsListener {
   int count = 0;
   int totalRecords;
 
-  private boolean testValues;
+  private final boolean testValues;
   private final BufferAllocator allocator;
 
   int batchCounter = 1;
@@ -109,13 +108,7 @@ public class ParquetResultListener implements UserResultsListener {
     count += result.getHeader().getRowCount();
     boolean schemaChanged = false;
     final RecordBatchLoader batchLoader = new RecordBatchLoader(allocator);
-    try {
-      schemaChanged = batchLoader.load(result.getHeader().getDef(), result.getData());
-      // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
-      // SchemaChangeException, so check/clean catch clause below.
-    } catch (SchemaChangeException e) {
-      throw new RuntimeException(e);
-    }
+    schemaChanged = batchLoader.load(result.getHeader().getDef(), result.getData());
 
     // used to make sure each vector in the batch has the same number of records
     int valueCount = batchLoader.getRecordCount();
@@ -124,7 +117,7 @@ public class ParquetResultListener implements UserResultsListener {
     if (schemaChanged) {
     } // do not believe any change is needed for when the schema changes, with the current mock scan use case
 
-    for (final VectorWrapper vw : batchLoader) {
+    for (final VectorWrapper<?> vw : batchLoader) {
       final ValueVector vv = vw.getValueVector();
       currentField = props.fields.get(vv.getField().getName());
       if (!valuesChecked.containsKey(vv.getField().getName())) {
@@ -210,7 +203,7 @@ public class ParquetResultListener implements UserResultsListener {
       if (i % 50 == 0) {
         final StringBuilder sb = new StringBuilder();
 
-        for (VectorWrapper vw : batchLoader) {
+        for (VectorWrapper<?> vw : batchLoader) {
           ValueVector v = vw.getValueVector();
           sb.append(Strings.padStart(v.getField().getName(), 20, ' ') + " ");
         }
@@ -220,7 +213,7 @@ public class ParquetResultListener implements UserResultsListener {
 
       final StringBuilder sb = new StringBuilder();
 
-      for (final VectorWrapper vw : batchLoader) {
+      for (final VectorWrapper<?> vw : batchLoader) {
         final ValueVector v = vw.getValueVector();
         Object o = v.getAccessor().getObject(i);
         if (o instanceof byte[]) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
index 2fa8c75..a19fd74 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
@@ -63,11 +63,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * An object to encapsulate the options for a Drill unit test, as well as the execution methods to perform the tests and
- * validation of results.
- *
- * To construct an instance easily, look at the TestBuilder class. From an implementation of
- * the BaseTestQuery class, and instance of the builder is accessible through the testBuilder() method.
+ * Encapsulates the options for a Drill unit test, as well as the execution
+ * methods to perform the tests and validation of results.
+ * <p>
+ * To construct an instance easily, look at the TestBuilder class. From an
+ * implementation of the BaseTestQuery class, and instance of the builder is
+ * accessible through the testBuilder() method.
  */
 public class DrillTestWrapper {
 
@@ -98,39 +99,39 @@ public class DrillTestWrapper {
   // one case where the setup for the baseline is driven by the test query results, and this is implicit type enforcement
   // for the baseline data. In this case there needs to be a call back into the TestBuilder once we know the type information
   // from the test query.
-  private TestBuilder testBuilder;
+  private final TestBuilder testBuilder;
   /**
    * Test query to run. Type of object depends on the {@link #queryType}
    */
-  private Object query;
+  private final Object query;
   // The type of query provided
-  private UserBitShared.QueryType queryType;
+  private final UserBitShared.QueryType queryType;
   // The type of query provided for the baseline
-  private UserBitShared.QueryType baselineQueryType;
+  private final UserBitShared.QueryType baselineQueryType;
   // should ordering be enforced in the baseline check
-  private boolean ordered;
-  private TestServices services;
+  private final boolean ordered;
+  private final TestServices services;
   // queries to run before the baseline or test queries, can be used to set options
-  private String baselineOptionSettingQueries;
-  private String testOptionSettingQueries;
+  private final String baselineOptionSettingQueries;
+  private final String testOptionSettingQueries;
   // allow approximate equality tests for number types
-  private boolean approximateEquality;
+  private final boolean approximateEquality;
   // tolerance for approximate equality tests defined as |Expected - Actual|/|Expected| <= Tolerance
-  private double tolerance;
+  private final double tolerance;
   // two different methods are available for comparing ordered results, the default reads all of the records
   // into giant lists of objects, like one giant on-heap batch of 'vectors'
   // this flag enables the other approach which iterates through a hyper batch for the test query results and baseline
   // while this does work faster and use less memory, it can be harder to debug as all of the elements are not in a
   // single list
-  private boolean highPerformanceComparison;
+  private final boolean highPerformanceComparison;
   // if the baseline is a single option test writers can provide the baseline values and columns
   // without creating a file, these are provided to the builder in the baselineValues() and baselineColumns() methods
   // and translated into a map in the builder
-  private String[] baselineColumns;
-  private List<Map<String, Object>> baselineRecords;
+  private final String[] baselineColumns;
+  private final List<Map<String, Object>> baselineRecords;
 
-  private int expectedNumBatches;
-  private int expectedNumRecords;
+  private final int expectedNumBatches;
+  private final int expectedNumRecords;
 
   public DrillTestWrapper(TestBuilder testBuilder, TestServices services, Object query, QueryType queryType,
       String baselineOptionSettingQueries, String testOptionSettingQueries,
@@ -312,11 +313,7 @@ public class DrillTestWrapper {
           }
           batchLoader.clear();
           QueryDataBatch batch = dataBatches.get(index);
-          try {
-            batchLoader.load(batch.getHeader().getDef(), batch.getData());
-          } catch (SchemaChangeException e) {
-            throw new RuntimeException(e);
-          }
+          batchLoader.load(batch.getHeader().getDef(), batch.getData());
           return batchLoader;
         }
 
@@ -421,6 +418,7 @@ public class DrillTestWrapper {
           case FOUR_BYTE:
             sv4 = loader.getSelectionVector4();
             break;
+          default:
         }
         if (sv4 != null) {
           for (int j = 0; j < sv4.getCount(); j++) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index 2352d0a..6b0641d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -67,21 +67,18 @@ import static org.junit.Assert.assertEquals;
  * Builder for a Drill query. Provides all types of query formats,
  * and a variety of ways to run the query.
  */
-
 public class QueryBuilder {
 
   /**
    * Listener used to retrieve the query summary (only) asynchronously
    * using a {@link QuerySummaryFuture}.
    */
-
   public static class SummaryOnlyQueryEventListener implements UserResultsListener {
 
     /**
      * The future to be notified. Created here and returned by the
      * query builder.
      */
-
     private final QuerySummaryFuture future;
     private QueryId queryId;
     private int recordCount;
@@ -374,28 +371,24 @@ public class QueryBuilder {
     // Unload the batch and convert to a row set.
 
     RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
-    try {
-      loader.load(resultBatch.getHeader().getDef(), resultBatch.getData());
-      resultBatch.release();
-      VectorContainer container = loader.getContainer();
-      container.setRecordCount(loader.getRecordCount());
-
-      // Null results? Drill will return a single batch with no rows
-      // and no columns even if the scan (or other) operator returns
-      // no batches at all. For ease of testing, simply map this null
-      // result set to a null output row set that says "nothing at all
-      // was returned." Note that this is different than an empty result
-      // set which has a schema, but no rows.
-
-      if (container.getRecordCount() == 0 && container.getNumberOfColumns() == 0) {
-        container.clear();
-        return null;
-      }
-
-      return DirectRowSet.fromContainer(container);
-    } catch (SchemaChangeException e) {
-      throw new IllegalStateException(e);
+    loader.load(resultBatch.getHeader().getDef(), resultBatch.getData());
+    resultBatch.release();
+    VectorContainer container = loader.getContainer();
+    container.setRecordCount(loader.getRecordCount());
+
+    // Null results? Drill will return a single batch with no rows
+    // and no columns even if the scan (or other) operator returns
+    // no batches at all. For ease of testing, simply map this null
+    // result set to a null output row set that says "nothing at all
+    // was returned." Note that this is different than an empty result
+    // set which has a schema, but no rows.
+
+    if (container.getRecordCount() == 0 && container.getNumberOfColumns() == 0) {
+      container.clear();
+      return null;
     }
+
+    return DirectRowSet.fromContainer(container);
   }
 
   public QueryRowSetIterator rowSetIterator() {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
index 03ae625..c8d8459 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
@@ -19,7 +19,6 @@ package org.apache.drill.test;
 
 import java.util.Iterator;
 
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.rowSet.DirectRowSet;
 import org.apache.drill.exec.physical.rowSet.RowSetFormatter;
@@ -87,16 +86,12 @@ public class QueryRowSetIterator implements Iterator<DirectRowSet>, Iterable<Dir
     // Unload the batch and convert to a row set.
 
     final RecordBatchLoader loader = new RecordBatchLoader(allocator);
-    try {
-      loader.load(batch.getHeader().getDef(), batch.getData());
-      batch.release();
-      batch = null;
-      VectorContainer container = loader.getContainer();
-      container.setRecordCount(loader.getRecordCount());
-      return DirectRowSet.fromContainer(container);
-    } catch (SchemaChangeException e) {
-      throw new IllegalStateException(e);
-    }
+    loader.load(batch.getHeader().getDef(), batch.getData());
+    batch.release();
+    batch = null;
+    VectorContainer container = loader.getContainer();
+    container.setRecordCount(loader.getRecordCount());
+    return DirectRowSet.fromContainer(container);
   }
 
   public void printAll() {
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
index 68c944a..6a6aaf8 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
@@ -40,7 +40,6 @@ import org.apache.calcite.avatica.util.Cursor;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
@@ -88,13 +87,13 @@ public class DrillCursor implements Cursor {
     // that the _query_ has _terminated_ (not necessarily _completing_
     // normally), while some uses imply that it's some other state of the
     // ResultListener.  Some uses seem redundant.)
-    volatile boolean completed = false;
+    volatile boolean completed;
 
     /** Whether throttling of incoming data is active. */
-    private final AtomicBoolean throttled = new AtomicBoolean( false );
+    private final AtomicBoolean throttled = new AtomicBoolean(false);
     private volatile ConnectionThrottle throttle;
 
-    private volatile boolean closed = false;
+    private volatile boolean closed;
 
     private final CountDownLatch firstMessageReceived = new CountDownLatch(1);
 
@@ -102,7 +101,7 @@ public class DrillCursor implements Cursor {
         Queues.newLinkedBlockingDeque();
 
     private final DrillCursor parent;
-    Stopwatch elapsedTimer = null;
+    Stopwatch elapsedTimer;
 
     /**
      * ...
@@ -111,11 +110,11 @@ public class DrillCursor implements Cursor {
      * @param batchQueueThrottlingThreshold
      *        queue size threshold for throttling server
      */
-    ResultsListener(DrillCursor parent, int batchQueueThrottlingThreshold ) {
+    ResultsListener(DrillCursor parent, int batchQueueThrottlingThreshold) {
       this.parent = parent;
       instanceId = nextInstanceId++;
       this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold;
-      logger.debug( "[#{}] Query listener created.", instanceId );
+      logger.debug("[#{}] Query listener created.", instanceId);
     }
 
     /**
@@ -123,9 +122,9 @@ public class DrillCursor implements Cursor {
      * @param  throttle  the "throttlable" object to throttle
      * @return  true if actually started (wasn't throttling already)
      */
-    private boolean startThrottlingIfNot( ConnectionThrottle throttle ) {
-      final boolean started = throttled.compareAndSet( false, true );
-      if ( started ) {
+    private boolean startThrottlingIfNot(ConnectionThrottle throttle) {
+      final boolean started = throttled.compareAndSet(false, true);
+      if (started) {
         this.throttle = throttle;
         throttle.setAutoRead(false);
       }
@@ -137,8 +136,8 @@ public class DrillCursor implements Cursor {
      * @return  true if actually stopped (was throttling)
      */
     private boolean stopThrottlingIfSo() {
-      final boolean stopped = throttled.compareAndSet( true, false );
-      if ( stopped ) {
+      final boolean stopped = throttled.compareAndSet(true, false);
+      if (stopped) {
         throttle.setAutoRead(true);
         throttle = null;
       }
@@ -147,10 +146,10 @@ public class DrillCursor implements Cursor {
 
     public void awaitFirstMessage() throws InterruptedException, SQLTimeoutException {
       //Check if a non-zero timeout has been set
-      if ( parent.timeoutInMilliseconds > 0 ) {
+      if (parent.timeoutInMilliseconds > 0) {
         //Identifying remaining in milliseconds to maintain a granularity close to integer value of timeout
         long timeToTimeout = parent.timeoutInMilliseconds - parent.elapsedTimer.elapsed(TimeUnit.MILLISECONDS);
-        if ( timeToTimeout <= 0 || !firstMessageReceived.await(timeToTimeout, TimeUnit.MILLISECONDS)) {
+        if (timeToTimeout <= 0 || !firstMessageReceived.await(timeToTimeout, TimeUnit.MILLISECONDS)) {
             throw new SqlTimeoutException(TimeUnit.MILLISECONDS.toSeconds(parent.timeoutInMilliseconds));
         }
       } else {
@@ -164,25 +163,25 @@ public class DrillCursor implements Cursor {
 
     @Override
     public void queryIdArrived(QueryId queryId) {
-      logger.debug( "[#{}] Received query ID: {}.",
-                    instanceId, QueryIdHelper.getQueryId( queryId ) );
+      logger.debug("[#{}] Received query ID: {}.",
+                    instanceId, QueryIdHelper.getQueryId(queryId));
       this.queryId = queryId;
     }
 
     @Override
     public void submissionFailed(UserException ex) {
-      logger.debug( "Received query failure:", instanceId, ex );
+      logger.debug("Received query failure:", instanceId, ex);
       this.executionFailureException = ex;
       completed = true;
       close();
-      logger.info( "[#{}] Query failed: ", instanceId, ex );
+      logger.info("[#{}] Query failed: ", instanceId, ex);
     }
 
     @Override
     public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
       lastReceivedBatchNumber++;
-      logger.debug( "[#{}] Received query data batch #{}: {}.",
-                    instanceId, lastReceivedBatchNumber, result );
+      logger.debug("[#{}] Received query data batch #{}: {}.",
+                    instanceId, lastReceivedBatchNumber, result);
 
       // If we're in a closed state, just release the message.
       if (closed) {
@@ -197,10 +196,10 @@ public class DrillCursor implements Cursor {
       batchQueue.add(result);
 
       // Throttle server if queue size has exceed threshold.
-      if (batchQueue.size() > batchQueueThrottlingThreshold ) {
-        if ( startThrottlingIfNot( throttle ) ) {
-          logger.debug( "[#{}] Throttling started at queue size {}.",
-                        instanceId, batchQueue.size() );
+      if (batchQueue.size() > batchQueueThrottlingThreshold) {
+        if (startThrottlingIfNot(throttle)) {
+          logger.debug("[#{}] Throttling started at queue size {}.",
+                        instanceId, batchQueue.size());
         }
       }
 
@@ -209,7 +208,7 @@ public class DrillCursor implements Cursor {
 
     @Override
     public void queryCompleted(QueryState state) {
-      logger.debug( "[#{}] Received query completion: {}.", instanceId, state );
+      logger.debug("[#{}] Received query completion: {}.", instanceId, state);
       releaseIfFirst();
       completed = true;
     }
@@ -218,7 +217,6 @@ public class DrillCursor implements Cursor {
       return queryId;
     }
 
-
     /**
      * Gets the next batch of query results from the queue.
      * @return  the next batch, or {@code null} after last batch has been returned
@@ -230,8 +228,8 @@ public class DrillCursor implements Cursor {
     QueryDataBatch getNext() throws UserException, InterruptedException, SQLTimeoutException {
       while (true) {
         if (executionFailureException != null) {
-          logger.debug( "[#{}] Dequeued query failure exception: {}.",
-                        instanceId, executionFailureException );
+          logger.debug("[#{}] Dequeued query failure exception: {}.",
+                        instanceId, executionFailureException);
           throw executionFailureException;
         }
         if (completed && batchQueue.isEmpty()) {
@@ -240,23 +238,23 @@ public class DrillCursor implements Cursor {
           QueryDataBatch qdb = batchQueue.poll(50, TimeUnit.MILLISECONDS);
           if (qdb != null) {
             lastDequeuedBatchNumber++;
-            logger.debug( "[#{}] Dequeued query data batch #{}: {}.",
-                          instanceId, lastDequeuedBatchNumber, qdb );
+            logger.debug("[#{}] Dequeued query data batch #{}: {}.",
+                          instanceId, lastDequeuedBatchNumber, qdb);
 
             // Unthrottle server if queue size has dropped enough below threshold:
-            if ( batchQueue.size() < batchQueueThrottlingThreshold / 2
+            if (batchQueue.size() < batchQueueThrottlingThreshold / 2
                  || batchQueue.size() == 0  // (in case threshold < 2)
-                 ) {
-              if ( stopThrottlingIfSo() ) {
-                logger.debug( "[#{}] Throttling stopped at queue size {}.",
-                              instanceId, batchQueue.size() );
+                ) {
+              if (stopThrottlingIfSo()) {
+                logger.debug("[#{}] Throttling stopped at queue size {}.",
+                              instanceId, batchQueue.size());
               }
             }
             return qdb;
           }
 
           // Check and throw SQLTimeoutException
-          if ( parent.timeoutInMilliseconds > 0 && parent.elapsedTimer.elapsed(TimeUnit.MILLISECONDS) >= parent.timeoutInMilliseconds ) {
+          if (parent.timeoutInMilliseconds > 0 && parent.elapsedTimer.elapsed(TimeUnit.MILLISECONDS) >= parent.timeoutInMilliseconds) {
             throw new SqlTimeoutException(TimeUnit.MILLISECONDS.toSeconds(parent.timeoutInMilliseconds));
           }
         }
@@ -264,11 +262,11 @@ public class DrillCursor implements Cursor {
     }
 
     void close() {
-      logger.debug( "[#{}] Query listener closing.", instanceId );
+      logger.debug("[#{}] Query listener closing.", instanceId);
       closed = true;
-      if ( stopThrottlingIfSo() ) {
-        logger.debug( "[#{}] Throttling stopped at close() (at queue size {}).",
-                      instanceId, batchQueue.size() );
+      if (stopThrottlingIfSo()) {
+        logger.debug("[#{}] Throttling stopped at close() (at queue size {}).",
+                      instanceId, batchQueue.size());
       }
       while (!batchQueue.isEmpty()) {
         // Don't bother with query timeout, we're closing the cursor
@@ -283,10 +281,9 @@ public class DrillCursor implements Cursor {
       firstMessageReceived.countDown(); // TODO:  Why not call releaseIfFirst as used elsewhere?
       completed = true;
     }
-
   }
 
-  private static final Logger logger = getLogger( DrillCursor.class );
+  private static final Logger logger = getLogger(DrillCursor.class);
 
   /** JDBC-specified string for unknown catalog, schema, and table names. */
   private static final String UNKNOWN_NAME_STRING = "";
@@ -310,10 +307,10 @@ public class DrillCursor implements Cursor {
   private DrillColumnMetaDataList columnMetaDataList;
 
   /** Whether loadInitialSchema() has been called. */
-  private boolean initialSchemaLoaded = false;
+  private boolean initialSchemaLoaded;
 
   /** Whether after first batch.  (Re skipping spurious empty batches.) */
-  private boolean afterFirstBatch = false;
+  private boolean afterFirstBatch;
 
   /**
    * Whether the next call to {@code this.}{@link #next()} should just return
@@ -329,10 +326,10 @@ public class DrillCursor implements Cursor {
    *   and schema before {@code Statement.execute...(...)} even returns.)
    * </p>
    */
-  private boolean returnTrueForNextCallToNext = false;
+  private boolean returnTrueForNextCallToNext;
 
   /** Whether cursor is after the end of the sequence of records/rows. */
-  private boolean afterLastRow = false;
+  private boolean afterLastRow;
 
   private int currentRowNumber = -1;
   /** Zero-based offset of current record in record batch.
@@ -340,7 +337,7 @@ public class DrillCursor implements Cursor {
   private int currentRecordNumber = -1;
 
   //Track timeout period
-  private long timeoutInMilliseconds = 0L;
+  private long timeoutInMilliseconds;
   private Stopwatch elapsedTimer;
 
   /**
@@ -357,7 +354,7 @@ public class DrillCursor implements Cursor {
     DrillClient client = connection.getClient();
     final int batchQueueThrottlingThreshold =
         client.getConfig().getInt(
-            ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD );
+            ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD);
     resultsListener = new ResultsListener(this, batchQueueThrottlingThreshold);
     currentBatchHolder = new RecordBatchLoader(client.getAllocator());
 
@@ -429,10 +426,10 @@ public class DrillCursor implements Cursor {
     final List<Class<?>> getObjectClasses = new ArrayList<>();
     // (Can't use modern for loop because, for some incompletely clear reason,
     // DrillAccessorList blocks iterator() (throwing exception).)
-    for ( int ax = 0; ax < accessors.size(); ax++ ) {
+    for (int ax = 0; ax < accessors.size(); ax++) {
       final AvaticaDrillSqlAccessor accessor =
-          accessors.get( ax );
-      getObjectClasses.add( accessor.getObjectClass() );
+          accessors.get(ax);
+      getObjectClasses.add(accessor.getObjectClass());
     }
 
     // Update metadata for result set.
@@ -441,7 +438,7 @@ public class DrillCursor implements Cursor {
         UNKNOWN_NAME_STRING,  // schema name
         UNKNOWN_NAME_STRING,  // table name
         schema,
-        getObjectClasses );
+        getObjectClasses);
 
     if (changeListener != null) {
       changeListener.schemaChanged(schema);
@@ -478,7 +475,7 @@ public class DrillCursor implements Cursor {
           while (qrb != null
               && (qrb.getHeader().getRowCount() == 0 && qrb.getData() == null)) {
             // Empty message--dispose of and try to get another.
-            logger.warn( "Spurious batch read: {}", qrb );
+            logger.warn("Spurious batch read: {}", qrb);
 
             qrb.release();
 
@@ -528,29 +525,22 @@ public class DrillCursor implements Cursor {
           return true;
         }
       }
-      catch ( UserException e ) {
+      catch (UserException e) {
         // A normally expected case--for any server-side error (e.g., syntax
         // error in SQL statement).
         // Construct SQLException with message text from the UserException.
         // TODO:  Map UserException error type to SQLException subclass (once
-        // error type is accessible, of course. :-( )
-        throw new SQLException( e.getMessage(), e );
+        // error type is accessible, of course. :-()
+        throw new SQLException(e.getMessage(), e);
       }
-      catch ( InterruptedException e ) {
+      catch (InterruptedException e) {
         // Not normally expected--Drill doesn't interrupt in this area (right?)--
         // but JDBC client certainly could.
-        throw new SQLException( "Interrupted.", e );
-      }
-      catch ( SchemaChangeException e ) {
-        // TODO:  Clean:  DRILL-2933:  RecordBatchLoader.load(...) no longer
-        // throws SchemaChangeException, so check/clean catch clause.
-        throw new SQLException(
-            "Unexpected SchemaChangeException from RecordBatchLoader.load(...)" );
+        throw new SQLException("Interrupted.", e);
       }
-      catch ( RuntimeException e ) {
-        throw new SQLException( "Unexpected RuntimeException: " + e.toString(), e );
+      catch (RuntimeException e) {
+        throw new SQLException("Unexpected RuntimeException: " + e.toString(), e);
       }
-
     }
   }
 
@@ -562,9 +552,9 @@ public class DrillCursor implements Cursor {
    * <p>
    */
   void loadInitialSchema() throws SQLException {
-    if ( initialSchemaLoaded ) {
+    if (initialSchemaLoaded) {
       throw new IllegalStateException(
-          "loadInitialSchema() called a second time" );
+          "loadInitialSchema() called a second time");
     }
 
     assert ! afterLastRow : "afterLastRow already true in loadInitialSchema()";
@@ -593,7 +583,7 @@ public class DrillCursor implements Cursor {
 
     try {
       resultsListener.awaitFirstMessage();
-    } catch ( InterruptedException e ) {
+    } catch (InterruptedException e) {
       // Preserve evidence that the interruption occurred so that code higher up
       // on the call stack can learn of the interruption and respond to it if it
       // wants to.
@@ -601,7 +591,7 @@ public class DrillCursor implements Cursor {
 
       // Not normally expected--Drill doesn't interrupt in this area (right?)--
       // but JDBC client certainly could.
-      throw new SQLException("Interrupted", e );
+      throw new SQLException("Interrupted", e);
     }
 
     returnTrueForNextCallToNext = true;
@@ -620,17 +610,17 @@ public class DrillCursor implements Cursor {
    */
   @Override
   public boolean next() throws SQLException {
-    if ( ! initialSchemaLoaded ) {
+    if (! initialSchemaLoaded) {
       throw new IllegalStateException(
-          "next() called but loadInitialSchema() was not called" );
+          "next() called but loadInitialSchema() was not called");
     }
     assert afterFirstBatch : "afterFirstBatch still false in next()";
 
-    if ( afterLastRow ) {
+    if (afterLastRow) {
       // We're already after end of rows/records--just report that after end.
       return false;
     }
-    else if ( returnTrueForNextCallToNext ) {
+    else if (returnTrueForNextCallToNext) {
       ++currentRowNumber;
       // We have a deferred "not after end" to report--reset and report that.
       returnTrueForNextCallToNext = false;
@@ -666,5 +656,4 @@ public class DrillCursor implements Cursor {
   public Stopwatch getElapsedTimer() {
     return elapsedTimer;
   }
-
 }