You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2018/10/01 12:35:39 UTC

[drill] 01/02: DRILL-6724: Dump operator context to logs when error occurs during query execution

This is an automated email from the ASF dual-hosted git repository.

vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 98e5de3b5af862779244bac8329852b3c9a901df
Author: Bohdan Kazydub <bo...@gmail.com>
AuthorDate: Fri Aug 31 18:20:34 2018 +0300

    DRILL-6724: Dump operator context to logs when error occurs during query execution
    
    closes #1455
---
 .../drill/common/exceptions/UserException.java     |   4 +-
 .../common/exceptions/UserExceptionContext.java    |  19 ++-
 .../store/mapr/db/json/MaprDBJsonRecordReader.java |  17 ++-
 .../drill/exec/store/hbase/HBaseRecordReader.java  |   5 +
 .../store/hive/readers/HiveAbstractReader.java     |  15 ++
 .../drill/exec/store/jdbc/JdbcRecordReader.java    |   8 +-
 .../drill/exec/store/kafka/KafkaRecordReader.java  |  26 +++-
 .../store/kafka/decoders/JsonMessageReader.java    |   4 +
 .../drill/exec/store/kudu/KuduRecordReader.java    |  11 ++
 .../apache/drill/exec/store/kudu/KuduWriter.java   |   5 +
 .../drill/exec/store/mongo/MongoRecordReader.java  |   5 +
 .../drill/exec/physical/config/HashAggregate.java  |   8 ++
 .../physical/config/OrderedPartitionSender.java    |  11 ++
 .../drill/exec/physical/config/PartitionLimit.java |   5 +
 .../apache/drill/exec/physical/config/Sort.java    |   7 +
 .../apache/drill/exec/physical/config/TopN.java    |   7 +
 .../drill/exec/physical/config/WindowPOP.java      |  16 +++
 .../drill/exec/physical/impl/BaseRootExec.java     |  23 +++
 .../apache/drill/exec/physical/impl/RootExec.java  |   6 +
 .../apache/drill/exec/physical/impl/ScanBatch.java |  30 +++-
 .../drill/exec/physical/impl/TopN/TopNBatch.java   |   9 +-
 .../exec/physical/impl/WriterRecordBatch.java      |   6 +
 .../exec/physical/impl/aggregate/HashAggBatch.java |   9 ++
 .../physical/impl/aggregate/HashAggTemplate.java   |   9 ++
 .../impl/aggregate/SpilledRecordbatch.java         |  35 ++++-
 .../physical/impl/aggregate/StreamingAggBatch.java |   6 +
 .../impl/aggregate/StreamingAggTemplate.java       |  10 ++
 .../physical/impl/filter/FilterRecordBatch.java    |   8 ++
 .../exec/physical/impl/filter/FilterTemplate2.java |   7 +
 .../exec/physical/impl/filter/FilterTemplate4.java |   6 +
 .../impl/filter/RuntimeFilterRecordBatch.java      |   9 +-
 .../physical/impl/flatten/FlattenRecordBatch.java  |   6 +
 .../physical/impl/flatten/FlattenTemplate.java     |  11 ++
 .../exec/physical/impl/join/HashJoinBatch.java     |   6 +
 .../physical/impl/join/HashJoinProbeTemplate.java  |  13 ++
 .../exec/physical/impl/join/LateralJoinBatch.java  |   8 ++
 .../exec/physical/impl/join/MergeJoinBatch.java    |   7 +
 .../physical/impl/join/NestedLoopJoinBatch.java    |   8 ++
 .../exec/physical/impl/limit/LimitRecordBatch.java |  74 +++++-----
 .../impl/limit/PartitionLimitRecordBatch.java      |  10 +-
 .../impl/mergereceiver/MergingRecordBatch.java     |   8 ++
 .../OrderedPartitionRecordBatch.java               |   6 +
 .../impl/producer/ProducerConsumerBatch.java       |   5 +
 .../physical/impl/project/ProjectRecordBatch.java  |   6 +
 .../physical/impl/project/ProjectorTemplate.java   |   6 +
 .../impl/protocol/OperatorRecordBatch.java         |  18 ++-
 .../drill/exec/physical/impl/sort/SortBatch.java   |   4 +
 .../exec/physical/impl/sort/SortTemplate.java      |   4 +
 .../impl/svremover/RemovingRecordBatch.java        |   5 +
 .../exec/physical/impl/trace/TraceRecordBatch.java |   6 +-
 .../physical/impl/union/UnionAllRecordBatch.java   |   5 +
 .../exec/physical/impl/unnest/UnnestImpl.java      |  10 ++
 .../physical/impl/unnest/UnnestRecordBatch.java    |   7 +-
 .../unorderedreceiver/UnorderedReceiverBatch.java  |  36 +++--
 .../validate/IteratorValidatorBatchIterator.java   |  13 +-
 .../physical/impl/window/FrameSupportTemplate.java |  11 ++
 .../impl/window/NoFrameSupportTemplate.java        |  11 ++
 .../exec/physical/impl/window/WindowDataBatch.java |   5 +
 .../impl/window/WindowFrameRecordBatch.java        |   7 +
 .../physical/impl/xsort/ExternalSortBatch.java     |   4 +
 .../exec/physical/impl/xsort/MSortTemplate.java    |   9 ++
 .../impl/xsort/SingleBatchSorterTemplate.java      |   5 +
 .../impl/xsort/managed/ExternalSortBatch.java      |   7 +
 .../physical/impl/xsort/managed/SortConfig.java    |   9 ++
 .../exec/physical/impl/xsort/managed/SortImpl.java |   8 ++
 .../drill/exec/record/AbstractRecordBatch.java     |  50 +++++--
 .../record/AbstractTableFunctionRecordBatch.java   |   1 -
 .../org/apache/drill/exec/record/RecordBatch.java  |  15 ++
 .../drill/exec/record/RecordBatchLoader.java       |   8 ++
 .../apache/drill/exec/record/RecordIterator.java   |  14 ++
 .../apache/drill/exec/record/SchemalessBatch.java  |  10 ++
 .../drill/exec/record/SimpleRecordBatch.java       |  13 ++
 .../exec/record/selection/SelectionVector4.java    |   9 ++
 .../drill/exec/store/AbstractRecordReader.java     |   1 -
 .../org/apache/drill/exec/store/RecordReader.java  |   4 +-
 .../apache/drill/exec/store/StorageStrategy.java   |   7 +-
 .../drill/exec/store/avro/AvroRecordReader.java    |  15 +-
 .../drill/exec/store/bson/BsonRecordReader.java    |  25 ++--
 .../drill/exec/store/dfs/easy/EasyWriter.java      |   8 ++
 .../exec/store/easy/json/JSONRecordReader.java     |  46 +++---
 .../drill/exec/store/easy/json/JsonProcessor.java  |  16 +--
 .../store/easy/json/reader/BaseJsonProcessor.java  |  48 ++++---
 .../store/easy/json/reader/CountingJsonReader.java |  14 +-
 .../sequencefile/SequenceFileRecordReader.java     |  17 ++-
 .../text/compliant/CompliantTextRecordReader.java  |   9 +-
 .../exec/store/easy/text/compliant/TextReader.java |   8 ++
 .../exec/store/httpd/HttpdLogFormatPlugin.java     |   8 ++
 .../drill/exec/store/image/ImageRecordReader.java  |   7 +-
 .../drill/exec/store/log/LogRecordReader.java      |   9 +-
 .../drill/exec/store/parquet/ParquetWriter.java    |   7 +
 .../parquet/columnreaders/ParquetRecordReader.java |  10 ++
 .../exec/store/parquet2/DrillParquetReader.java    |   5 +
 .../drill/exec/store/pcap/PcapRecordReader.java    |   5 +
 .../exec/store/text/DrillTextRecordReader.java     |   9 ++
 .../drill/exec/vector/complex/fn/JsonReader.java   |  62 +++-----
 .../drill/exec/work/fragment/FragmentExecutor.java |   4 +
 .../parquet/hadoop/ColumnChunkIncReadStore.java    |   5 +
 .../java/org/apache/drill/TestOperatorDump.java    | 159 +++++++++++++++++++++
 .../drill/exec/physical/impl/MockRecordBatch.java  |   9 ++
 .../drill/exec/physical/impl/SimpleRootExec.java   |   5 +
 .../physical/impl/unnest/MockLateralJoinBatch.java |   9 ++
 .../drill/exec/work/filter/BloomFilterTest.java    |   9 ++
 .../java/org/apache/drill/test/LogFixture.java     |  12 +-
 103 files changed, 1165 insertions(+), 211 deletions(-)

diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
index 56cb935..3a3ca5a 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
@@ -771,8 +771,10 @@ public class UserException extends DrillRuntimeException {
    * @return generated user error message
    */
   private String generateMessage(boolean includeErrorIdAndIdentity) {
+    boolean seeLogsMessage = errorType == DrillPBError.ErrorType.INTERNAL_ERROR
+        || errorType == DrillPBError.ErrorType.SYSTEM;
     return errorType + " ERROR: " + super.getMessage() + "\n\n" +
-        context.generateContextMessage(includeErrorIdAndIdentity);
+        context.generateContextMessage(includeErrorIdAndIdentity, seeLogsMessage);
   }
 
 }
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java b/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java
index fa2c437..271462a 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java
@@ -29,6 +29,8 @@ import org.apache.drill.exec.proto.CoordinationProtos;
  */
 class UserExceptionContext {
 
+  private static final String NEW_LINE = System.lineSeparator();
+
   private final String errorId;
   private final List<String> contextList;
 
@@ -133,17 +135,26 @@ class UserExceptionContext {
    * generate a context message
    * @return string containing all context information concatenated
    */
-  String generateContextMessage(boolean includeErrorIdAndIdentity) {
+  String generateContextMessage(boolean includeErrorIdAndIdentity, boolean includeSeeLogsMessage) {
     StringBuilder sb = new StringBuilder();
 
     for (String context : contextList) {
-      sb.append(context).append("\n");
+      sb.append(context)
+          .append(NEW_LINE);
+    }
+
+    if (includeSeeLogsMessage) {
+      sb.append(NEW_LINE)
+          .append("Please, refer to logs for more information.")
+          .append(NEW_LINE);
     }
 
     if (includeErrorIdAndIdentity) {
       // add identification infos
-      sb.append("\n[Error Id: ");
-      sb.append(errorId).append(" ");
+      sb.append(NEW_LINE)
+          .append("[Error Id: ")
+          .append(errorId)
+          .append(" ");
       if (endpoint != null) {
         sb.append("on ")
             .append(endpoint.getAddress())
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
index 3c7ca8e..b68f574 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -81,6 +81,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
   private final String tableName;
   private OperatorContext operatorContext;
   private VectorContainerWriter vectorWriter;
+  private DBDocumentReaderBase reader;
 
   private DrillBuf buffer;
 
@@ -195,7 +196,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
     vectorWriter.reset();
 
     int recordCount = 0;
-    DBDocumentReaderBase reader = null;
+    reader = null;
 
     while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION) {
       vectorWriter.setPosition(recordCount);
@@ -526,4 +527,18 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
       table.close();
     }
   }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("MaprDBJsonRecordReader[Table=")
+        .append(table.getPath());
+    if (reader != null) {
+      sb.append(", Document ID=")
+          .append(IdCodec.asString(reader.getId()));
+    }
+    sb.append(", reader=")
+        .append(reader)
+        .append(']');
+    return sb.toString();
+  }
 }
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 86038c4..2db1d02 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -338,4 +338,9 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
     return rowCount < TARGET_RECORD_COUNT &&
         operatorContext.getAllocator().getAllocatedMemory() < MAX_ALLOCATED_MEMORY_PER_BATCH;
   }
+
+  @Override
+  public String toString() {
+    return "HBaseRecordReader[Table=" + hbaseTableName.getNamespaceAsString() + "]";
+  }
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
index 354a61e..ba1cd30 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
@@ -430,4 +430,19 @@ public abstract class HiveAbstractReader extends AbstractRecordReader {
     }
   }
 
+  @Override
+  public String toString() {
+    long position = -1;
+    try {
+      if (reader != null) {
+        position = reader.getPos();
+      }
+    } catch (IOException e) {
+      logger.trace("Unable to obtain reader position: " + e.getMessage());
+    }
+    return getClass().getSimpleName() + "[Database=" + table.getDbName()
+        + ", Table=" + table.getTableName()
+        + ", Position=" + position
+        + "]";
+  }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
index 1b6e211..cd732a6 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
@@ -279,6 +279,13 @@ class JdbcRecordReader extends AbstractRecordReader {
     AutoCloseables.close(resultSet, statement, connection);
   }
 
+  @Override
+  public String toString() {
+    return "JdbcRecordReader[sql=" + sql
+        + ", Plugin=" + storagePluginName
+        + "]";
+  }
+
   private abstract class Copier<T extends ValueVector.Mutator> {
     protected final int columnIndex;
     protected final ResultSet result;
@@ -478,5 +485,4 @@ class JdbcRecordReader extends AbstractRecordReader {
     }
 
   }
-
 }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
index d715ada..9559c3d 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
@@ -60,6 +60,7 @@ public class KafkaRecordReader extends AbstractRecordReader {
   private final boolean enableAllTextMode;
   private final boolean readNumbersAsDouble;
   private final String kafkaMsgReader;
+  private int currentMessageCount;
 
   public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, List<SchemaPath> projectedColumns,
       FragmentContext context, KafkaStoragePlugin plugin) {
@@ -105,27 +106,27 @@ public class KafkaRecordReader extends AbstractRecordReader {
     writer.allocate();
     writer.reset();
     Stopwatch watch = Stopwatch.createStarted();
-    int messageCount = 0;
+    currentMessageCount = 0;
 
     try {
       while (currentOffset < subScanSpec.getEndOffset() - 1 && msgItr.hasNext()) {
         ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
         currentOffset = consumerRecord.offset();
-        writer.setPosition(messageCount);
+        writer.setPosition(currentMessageCount);
         messageReader.readMessage(consumerRecord);
-        if (++messageCount >= DEFAULT_MESSAGES_PER_BATCH) {
+        if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
           break;
         }
       }
 
       messageReader.ensureAtLeastOneField();
-      writer.setValueCount(messageCount);
-      logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), messageCount);
+      writer.setValueCount(currentMessageCount);
+      logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), currentMessageCount);
       logger.debug("Last offset consumed for {}:{} is {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
           currentOffset);
-      return messageCount;
+      return currentMessageCount;
     } catch (Exception e) {
-      String msg = "Failure while reading messages from kafka. Recordreader was at record: " + (messageCount + 1);
+      String msg = "Failure while reading messages from kafka. Recordreader was at record: " + (currentMessageCount + 1);
       throw UserException.dataReadError(e).message(msg).addContext(e.getMessage()).build(logger);
     }
   }
@@ -139,4 +140,15 @@ public class KafkaRecordReader extends AbstractRecordReader {
     messageReader.close();
   }
 
+  @Override
+  public String toString() {
+    return "KafkaRecordReader[messageReader=" + messageReader
+        + ", kafkaPollTimeOut=" + kafkaPollTimeOut
+        + ", currentOffset=" + currentOffset
+        + ", enableAllTextMode=" + enableAllTextMode
+        + ", readNumbersAsDouble=" + readNumbersAsDouble
+        + ", kafkaMsgReader=" + kafkaMsgReader
+        + ", currentMessageCount=" + currentMessageCount
+        + "]";
+  }
 }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
index a62357d..40e9e12 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
@@ -105,4 +105,8 @@ public class JsonMessageReader implements MessageReader {
     }
   }
 
+  @Override
+  public String toString() {
+    return "JsonMessageReader[jsonReader=" + jsonReader + "]";
+  }
 }
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
index 845738c..976b16d 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
@@ -78,6 +78,9 @@ public class KuduRecordReader extends AbstractRecordReader {
   private OutputMutator output;
   private OperatorContext context;
 
+  private String lastColumnName;
+  private Type lastColumnType;
+
   private static class ProjectedColumnInfo {
     int index;
     ValueVector vv;
@@ -176,6 +179,8 @@ public class KuduRecordReader extends AbstractRecordReader {
 
       final String name = col.getName();
       final Type kuduType = col.getType();
+      lastColumnName = name;
+      lastColumnType = kuduType;
       MinorType minorType = TYPES.get(kuduType);
       if (minorType == null) {
         logger.warn("Ignoring column that is unsupported.", UserException
@@ -326,4 +331,10 @@ public class KuduRecordReader extends AbstractRecordReader {
   public void close() {
   }
 
+  @Override
+  public String toString() {
+    return "KuduRecordReader[Column=" + lastColumnName
+        + ", Type=" + lastColumnType
+        + "]";
+  }
 }
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
index d0fa158..7611576 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
@@ -77,4 +77,9 @@ public class KuduWriter extends AbstractWriter {
   public KuduStoragePlugin getPlugin() {
     return plugin;
   }
+
+  @Override
+  public String toString() {
+    return "KuduWriter[name=" + name + ", storageStrategy=" + getStorageStrategy() + "]";
+  }
 }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index a79e39a..f5d1f2e 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -216,4 +216,9 @@ public class MongoRecordReader extends AbstractRecordReader {
   public void close() {
   }
 
+  @Override
+  public String toString() {
+    Object reader = isBsonRecordReader ? bsonReader : jsonReader;
+    return "MongoRecordReader[reader=" + reader + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
index da988de..521aba1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
@@ -106,4 +106,12 @@ public class HashAggregate extends AbstractSingle {
     return queryContext == null ||
       1 < (int) queryContext.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
   }
+
+  @Override
+  public String toString() {
+    return "HashAggregate[groupByExprs=" + groupByExprs
+        + ", aggrExprs=" + aggrExprs
+        + ", cardinality=" + cardinality
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
index d28d563..320bc6d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
@@ -106,4 +106,15 @@ public class OrderedPartitionSender extends AbstractSender {
   public int getOperatorType() {
     return CoreOperatorType.ORDERED_PARTITION_SENDER_VALUE;
   }
+
+  @Override
+  public String toString() {
+    return "OrderedPartitionSender[orderings=" + orderings
+        + ", ref=" + ref
+        + ", sendingWidth=" + sendingWidth
+        + ", recordsToSample=" + recordsToSample
+        + ", samplingFactor=" + samplingFactor
+        + ", completionFactor=" + completionFactor
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
index 29f8bb2..4ea710d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
@@ -59,4 +59,9 @@ public class PartitionLimit extends Limit {
   public int getOperatorType() {
     return CoreOperatorType.PARTITION_LIMIT_VALUE;
   }
+
+  @Override
+  public String toString() {
+    return "PartitionLimit[partitionColumn=" + partitionColumn + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
index 85ef7da..5d65c39 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
@@ -71,4 +71,11 @@ public class Sort extends AbstractSingle{
   public int getOperatorType() {
     return CoreOperatorType.OLD_SORT_VALUE;
   }
+
+  @Override
+  public String toString() {
+    return "Sort[orderings=" + orderings
+        + ", reverse=" + reverse
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java
index 54d1b4d..d2a87d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java
@@ -69,4 +69,11 @@ public class TopN extends Sort {
     return CoreOperatorType.TOP_N_SORT_VALUE;
   }
 
+  @Override
+  public String toString() {
+    return "TopN[orderings=" + orderings
+        + ", reverse=" + reverse
+        + ", limit=" + limit
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java
index 543c09f..3ddaa7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java
@@ -95,6 +95,17 @@ public class WindowPOP extends AbstractSingle {
     return frameUnitsRows;
   }
 
+  @Override
+  public String toString() {
+    return "WindowPOP[withins=" + withins
+        + ", aggregations=" + aggregations
+        + ", orderings=" + orderings
+        + ", frameUnitsRows=" + frameUnitsRows
+        + ", start=" + start
+        + ", end=" + end
+        + "]";
+  }
+
   @JsonTypeName("windowBound")
   public static class Bound {
     private final boolean unbounded;
@@ -117,6 +128,11 @@ public class WindowPOP extends AbstractSingle {
     public long getOffset() {
       return offset;
     }
+
+    @Override
+    public String toString() {
+      return "Bound[unbounded=" + unbounded + ", offset=" + offset + "]";
+    }
   }
 
   public static Bound newBound(RexWindowBound windowBound) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index e148278..9142a2a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl;
 
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.drill.common.DeferredException;
@@ -124,6 +125,28 @@ public abstract class BaseRootExec implements RootExec {
   }
 
   @Override
+  public void dumpBatches() {
+    final int numberOfBatchesToDump = 2;
+    logger.error("Batch dump started: dumping last {} failed batches", numberOfBatchesToDump);
+    // As batches are stored in a 'flat' List there is a need to filter out the failed batch
+    // and a few of its parent (actual number of batches is set by a constant defined above)
+    List<CloseableRecordBatch> failedBatchStack = new LinkedList<>();
+    for (int i = operators.size() - 1; i >= 0; i--) {
+      CloseableRecordBatch batch = operators.get(i);
+      if (batch.hasFailed()) {
+        failedBatchStack.add(0, batch);
+        if (failedBatchStack.size() == numberOfBatchesToDump) {
+          break;
+        }
+      }
+    }
+    for (CloseableRecordBatch batch : failedBatchStack) {
+      batch.dump();
+    }
+    logger.error("Batch dump completed.");
+  }
+
+  @Override
   public void close() throws Exception {
     // We want to account for the time spent waiting here as Wait time in the operator profile
     try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
index df0f89b..34f2131 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
@@ -44,4 +44,10 @@ public interface RootExec extends AutoCloseable {
    * @param handle The handle pointing to the downstream receiver that does not need anymore data.
    */
   void receivingFragmentFinished(FragmentHandle handle);
+
+  /**
+   * Dump failed batches' state preceded by its parent's state to logs. Invoked when there is a
+   * failure during fragment execution.
+   */
+  void dumpBatches();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index cd24a4c..dc8dd0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -83,6 +83,10 @@ public class ScanBatch implements CloseableRecordBatch {
   private final List<Map<String, String>> implicitColumnList;
   private String currentReaderClassName;
   private final RecordBatchStatsContext batchStatsContext;
+  // Represents last outcome of next(). If an Exception is thrown
+  // during the method's execution a value IterOutcome.STOP will be assigned.
+  private IterOutcome lastOutcome;
+
   /**
    *
    * @param context
@@ -160,7 +164,8 @@ public class ScanBatch implements CloseableRecordBatch {
   @Override
   public IterOutcome next() {
     if (done) {
-      return IterOutcome.NONE;
+      lastOutcome = IterOutcome.NONE;
+      return lastOutcome;
     }
     oContext.getStats().startProcessing();
     try {
@@ -168,7 +173,8 @@ public class ScanBatch implements CloseableRecordBatch {
         if (currentReader == null && !getNextReaderIfHas()) {
           releaseAssets(); // All data has been read. Release resource.
           done = true;
-          return IterOutcome.NONE;
+          lastOutcome = IterOutcome.NONE;
+          return lastOutcome;
         }
         injector.injectChecked(context.getExecutionControls(), "next-allocate", OutOfMemoryException.class);
         currentReader.allocate(mutator.fieldVectorMap());
@@ -191,7 +197,8 @@ public class ScanBatch implements CloseableRecordBatch {
           // This could happen when data sources have a non-trivial schema with 0 row.
           container.buildSchema(SelectionVectorMode.NONE);
           schema = container.getSchema();
-          return IterOutcome.OK_NEW_SCHEMA;
+          lastOutcome = IterOutcome.OK_NEW_SCHEMA;
+          return lastOutcome;
         }
 
         // Handle case of same schema.
@@ -199,11 +206,13 @@ public class ScanBatch implements CloseableRecordBatch {
             continue; // Skip to next loop iteration if reader returns 0 row and has same schema.
         } else {
           // return OK if recordCount > 0 && ! isNewSchema
-          return IterOutcome.OK;
+          lastOutcome = IterOutcome.OK;
+          return lastOutcome;
         }
       }
     } catch (OutOfMemoryException ex) {
       clearFieldVectorMap();
+      lastOutcome = IterOutcome.STOP;
       throw UserException.memoryError(ex).build(logger);
     } catch (ExecutionSetupException e) {
       if (currentReader != null) {
@@ -213,12 +222,15 @@ public class ScanBatch implements CloseableRecordBatch {
           logger.error("Close failed for reader " + currentReaderClassName, e2);
         }
       }
+      lastOutcome = IterOutcome.STOP;
       throw UserException.internalError(e)
           .addContext("Setup failed for", currentReaderClassName)
           .build(logger);
     } catch (UserException ex) {
+      lastOutcome = IterOutcome.STOP;
       throw ex;
     } catch (Exception ex) {
+      lastOutcome = IterOutcome.STOP;
       throw UserException.internalError(ex).build(logger);
     } finally {
       oContext.getStats().stopProcessing();
@@ -559,4 +571,14 @@ public class ScanBatch implements CloseableRecordBatch {
 
     return true;
   }
+
+  @Override
+  public boolean hasFailed() {
+    return lastOutcome == IterOutcome.STOP;
+  }
+
+  @Override
+  public void dump() {
+    logger.error("ScanBatch[container={}, currentReader={}, schema={}]", container, currentReader, schema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index ba4b94a..22dfdf0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -74,6 +74,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
 
 /**
  * Operator Batch which implements the TopN functionality. It is more efficient than (sort + limit) since unlike sort
@@ -185,7 +186,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     // Reset the TopN state for next iteration
     resetTopNState();
 
-    try{
+    try {
       boolean incomingHasSv2 = false;
       switch (incoming.getSchema().getSelectionVectorMode()) {
         case NONE: {
@@ -693,4 +694,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
       return sv4;
     }
   }
+
+  @Override
+  public void dump() {
+    logger.error("TopNBatch[container={}, config={}, schema={}, sv4={}, countSincePurge={}, " +
+        "batchCount={}, recordCount={}]", container, config, schema, sv4, countSincePurge, batchCount, recordCount);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 65d0c54..3a8485a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -209,4 +209,10 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
     closeWriter();
     super.close();
   }
+
+  @Override
+  public void dump() {
+    logger.error("WriterRecordBatch[container={}, popConfig={}, counter={}, fragmentUniqueId={}, schema={}]",
+        container, popConfig, counter, fragmentUniqueId, schema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 9de9aae..80d25ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.aggregate;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -500,4 +501,12 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     wasKilled = true;
     incoming.kill(sendUpstream);
   }
+
+  @Override
+  public void dump() {
+    logger.error("HashAggBatch[container={}, aggregator={}, groupByOutFieldIds={}, aggrOutFieldIds={}, " +
+            "incomingSchema={}, wasKilled={}, numGroupByExprs={}, numAggrExprs={}, popConfig={}]",
+        container, aggregator, Arrays.toString(groupByOutFieldIds), Arrays.toString(aggrOutFieldIds), incomingSchema,
+        wasKilled, numGroupByExprs, numAggrExprs, popConfig);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index e8ae30e..f6dd3da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 
 import javax.inject.Named;
 
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.drill.common.exceptions.RetryAfterSpillException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ExpressionPosition;
@@ -1603,6 +1604,14 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
   }
 
+  @Override
+  public String toString() {
+    // The fields are excluded because they are passed from HashAggBatch
+    String[] excludedFields = new String[] {
+        "baseHashTable", "incoming", "outgoing", "context", "oContext", "allocator", "htables", "newIncoming"};
+    return ReflectionToStringBuilder.toStringExclude(this, excludedFields);
+  }
+
   // Code-generated methods (implemented in HashAggBatch)
   public abstract void doSetup(@Named("incoming") RecordBatch incoming) throws SchemaChangeException;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
index 7ebce2b..822a810 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.SimpleRecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -40,6 +41,9 @@ import java.util.Iterator;
  * A class to replace "incoming" - instead scanning a spilled partition file
  */
 public class SpilledRecordbatch implements CloseableRecordBatch {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleRecordBatch.class);
+
   private VectorContainer container;
   private InputStream spillStream;
   private int spilledBatches;
@@ -49,6 +53,9 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
   private String spillFile;
   VectorAccessibleSerializable vas;
   private IterOutcome initialOutcome;
+  // Represents last outcome of next(). If an Exception is thrown
+  // during the method's execution a value IterOutcome.STOP will be assigned.
+  private IterOutcome lastOutcome;
 
   public SpilledRecordbatch(String spillFile, int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) {
     this.context = context;
@@ -66,6 +73,7 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
     }
 
     initialOutcome = next(); // initialize the container
+    lastOutcome = initialOutcome;
   }
 
   @Override
@@ -126,14 +134,19 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
   @Override
   public IterOutcome next() {
 
-    if ( ! context.getExecutorState().shouldContinue() ) { return IterOutcome.STOP; }
+    if (!context.getExecutorState().shouldContinue()) {
+      lastOutcome = IterOutcome.STOP;
+      return lastOutcome;
+    }
 
     if ( spilledBatches <= 0 ) { // no more batches to read in this partition
       this.close();
-      return IterOutcome.NONE;
+      lastOutcome = IterOutcome.NONE;
+      return lastOutcome;
     }
 
     if ( spillStream == null ) {
+      lastOutcome = IterOutcome.STOP;
       throw new IllegalStateException("Spill stream was null");
     }
 
@@ -152,11 +165,16 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
         container = vas.get();
       }
     } catch (IOException e) {
+      lastOutcome = IterOutcome.STOP;
       throw UserException.dataReadError(e).addContext("Failed reading from a spill file").build(HashAggTemplate.logger);
+    } catch (Exception e) {
+      lastOutcome = IterOutcome.STOP;
+      throw e;
     }
 
     spilledBatches--; // one less batch to read
-    return IterOutcome.OK;
+    lastOutcome = IterOutcome.OK;
+    return lastOutcome;
   }
 
   /**
@@ -164,6 +182,17 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
    */
   public IterOutcome getInitialOutcome() { return initialOutcome; }
 
+  @Override
+  public void dump() {
+    logger.error("SpilledRecordbatch[container={}, spilledBatches={}, schema={}, spillFile={}, spillSet={}]",
+        container, spilledBatches, schema, spillFile, spillSet);
+  }
+
+  @Override
+  public boolean hasFailed() {
+    return lastOutcome == IterOutcome.STOP;
+  }
+
   /**
    * Note: ignoring any IO errors (e.g. file not found)
    */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index c42f9bf..2b9b317 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -648,4 +648,10 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   protected void killIncoming(boolean sendUpstream) {
     incoming.kill(sendUpstream);
   }
+
+  @Override
+  public void dump() {
+    logger.error("StreamingAggBatch[container={}, popConfig={}, aggregator={}, incomingSchema={}]",
+        container, popConfig, aggregator, incomingSchema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index f30616b..4bde7ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -498,6 +498,16 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
   public void cleanup() {
   }
 
+  @Override
+  public String toString() {
+    return "StreamingAggTemplate[underlyingIndex=" + underlyingIndex
+        + ", previousIndex=" + previousIndex
+        + ", currentIndex=" + currentIndex
+        + ", addedRecordCount=" + addedRecordCount
+        + ", outputCount=" + outputCount
+        + "]";
+  }
+
   public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
   public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2);
   public abstract boolean isSamePrev(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int b2Index);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index b8d4e76..179e6c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -43,6 +43,9 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter> {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
+
   private SelectionVector2 sv2;
   private SelectionVector4 sv4;
   private Filterer filter;
@@ -196,4 +199,9 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter> {
       throw new SchemaChangeException("Failure while attempting to load generated class", e);
     }
   }
+
+  @Override
+  public void dump() {
+    logger.error("FilterRecordBatch[container={}, selectionVector2={}, filter={}, popConfig={}]", container, sv2, filter, popConfig);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
index 7b0183b..483a9ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
@@ -119,4 +119,11 @@ public abstract class FilterTemplate2 implements Filterer {
                                  @Named("outIndex") int outIndex)
                           throws SchemaChangeException;
 
+  @Override
+  public String toString() {
+    return "FilterTemplate2[outgoingSelectionVector=" + outgoingSelectionVector
+        + ", incomingSelectionVector=" + incomingSelectionVector
+        + ", svMode=" + svMode
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
index e09ed75..d85d6f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
@@ -64,4 +64,10 @@ public abstract class FilterTemplate4 implements Filterer {
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
   public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
 
+  @Override
+  public String toString() {
+    return "FilterTemplate4[outgoingSelectionVector=" + outgoingSelectionVector
+        + ", incomingSelectionVector=" + incomingSelectionVector
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index 7faaaa5..bc21580 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
@@ -247,4 +247,11 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
       }
     }
   }
-}
\ No newline at end of file
+
+  @Override
+  public void dump() {
+    logger.error("RuntimeFilterRecordBatch[container={}, selectionVector={}, toFilterFields={}, "
+        + "originalRecordCount={}, batchSchema={}]",
+        container, sv2, toFilterFields, originalRecordCount, incoming.getSchema());
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 1623319..86ddcd1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -532,4 +532,10 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
     updateStats();
     super.close();
   }
+
+  @Override
+  public void dump() {
+    logger.error("FlattenRecordbatch[hasRemainder={}, remainderIndex={}, recordCount={}, flattener={}, container={}]",
+        hasRemainder, remainderIndex, recordCount, flattener, container);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
index e59abac..fe38244 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
@@ -175,4 +175,15 @@ public abstract class FlattenTemplate implements Flattener {
                                @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
   public abstract boolean doEval(@Named("inIndex") int inIndex,
                                  @Named("outIndex") int outIndex) throws SchemaChangeException;
+
+  @Override
+  public String toString() {
+    return "FlattenTemplate[svMode=" + svMode
+        + ", fieldToFlatten=" + fieldToFlatten
+        + ", valueIndex=" + valueIndex
+        + ", outputLimit=" + outputLimit
+        + ", innerValueIndex=" + innerValueIndex
+        + ", currentInnerValueIndex=" + currentInnerValueIndex
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 368bb5d..89ab8d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -1276,4 +1276,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     return hj;
   }
 
+  @Override
+  public void dump() {
+    logger.error("HashJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, hashJoinProbe={}," +
+            " rightExpr={}, canSpill={}, buildSchema={}, probeSchema={}]", container, left, right, leftUpstream, rightUpstream,
+        joinType, hashJoinProbe, rightExpr, canSpill, buildSchema, probeSchema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 639f757..71abeda 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -432,4 +432,17 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
       (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) ? ProbeState.PROJECT_RIGHT :
         ProbeState.DONE; // else we're done
   }
+
+  @Override
+  public String toString() {
+    return "HashJoinProbeTemplate[container=" + container
+        + ", probeSchema=" + probeSchema
+        + ", joinType=" + joinType
+        + ", recordsToProcess=" + recordsToProcess
+        + ", recordsProcessed=" + recordsProcessed
+        + ", outputRecords=" + outputRecords
+        + ", probeState=" + probeState
+        + ", unmatchedBuildIndexes=" + unmatchedBuildIndexes
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 1aaf5e2..242687f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -1205,4 +1205,12 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
       }
     }
   }
+
+  @Override
+  public void dump() {
+    logger.error("LateralJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, leftSchema={}, " +
+            "rightSchema={}, outputIndex={}, leftJoinIndex={}, rightJoinIndex={}, hasRemainderForLeftJoin={}]",
+        container, left, right, leftUpstream, rightUpstream, leftSchema, rightSchema, outputIndex,
+        leftJoinIndex, rightJoinIndex, hasRemainderForLeftJoin);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 72f776a..d502c4f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -547,4 +547,11 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
     }
     return materializedExpr;
   }
+
+  @Override
+  public void dump() {
+    logger.error("MergeJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, leftIterator={}," +
+            " rightIterator={}, joinStatus={}, joinType={}]",
+        container, left, right, leftUpstream, rightUpstream, joinType, leftIterator, rightIterator, status, joinType);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index e2f93ec..6b0c749 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -459,4 +459,12 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
   public int getRecordCount() {
     return outputRecords;
   }
+
+  @Override
+  public void dump() {
+    logger.error("NestedLoopJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, "
+            + "leftSchema={}, rightSchema={}, outputRecords={}, rightContainer={}, rightCounts={}]",
+        container, left, right, leftUpstream, rightUpstream,
+        leftSchema, rightSchema, outputRecords, rightContainer, rightCounts);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 784d955..bb49187 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -37,7 +37,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 
 public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
-  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
 
   private SelectionVector2 outgoingSv;
   private SelectionVector2 incomingSv;
@@ -58,46 +58,46 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   @Override
   public IterOutcome innerNext() {
     if (!first && !needMoreRecords(numberOfRecords)) {
-        outgoingSv.setRecordCount(0);
-        incoming.kill(true);
+      outgoingSv.setRecordCount(0);
+      incoming.kill(true);
 
-        IterOutcome upStream = next(incoming);
+      IterOutcome upStream = next(incoming);
+      if (upStream == IterOutcome.OUT_OF_MEMORY) {
+        return upStream;
+      }
+
+      while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
+        // Clear the memory for the incoming batch
+        for (VectorWrapper<?> wrapper : incoming) {
+          wrapper.getValueVector().clear();
+        }
+        // clear memory for incoming sv (if any)
+        if (incomingSv != null) {
+          incomingSv.clear();
+        }
+        upStream = next(incoming);
         if (upStream == IterOutcome.OUT_OF_MEMORY) {
           return upStream;
         }
-
-        while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
-          // Clear the memory for the incoming batch
-          for (VectorWrapper<?> wrapper : incoming) {
-            wrapper.getValueVector().clear();
-          }
-          // clear memory for incoming sv (if any)
-          if (incomingSv != null) {
-            incomingSv.clear();
-          }
-          upStream = next(incoming);
-          if (upStream == IterOutcome.OUT_OF_MEMORY) {
-            return upStream;
-          }
+      }
+      // If EMIT that means leaf operator is UNNEST, in this case refresh the limit states and return EMIT.
+      if (upStream == EMIT) {
+        // Clear the memory for the incoming batch
+        for (VectorWrapper<?> wrapper : incoming) {
+          wrapper.getValueVector().clear();
         }
-        // If EMIT that means leaf operator is UNNEST, in this case refresh the limit states and return EMIT.
-        if (upStream == EMIT) {
-          // Clear the memory for the incoming batch
-          for (VectorWrapper<?> wrapper : incoming) {
-            wrapper.getValueVector().clear();
-          }
-
-          // clear memory for incoming sv (if any)
-          if (incomingSv != null) {
-            incomingSv.clear();
-          }
-
-          refreshLimitState();
-          return upStream;
+
+        // clear memory for incoming sv (if any)
+        if (incomingSv != null) {
+          incomingSv.clear();
         }
-        // other leaf operator behave as before.
-        return NONE;
+
+        refreshLimitState();
+        return upStream;
       }
+      // other leaf operator behave as before.
+      return NONE;
+    }
     return super.innerNext();
   }
 
@@ -266,4 +266,10 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
       Integer.MIN_VALUE : Math.max(0, popConfig.getLast()) - recordStartOffset;
     first = true;
   }
+
+  @Override
+  public void dump() {
+    logger.error("LimitRecordBatch[container={}, offset={}, numberOfRecords={}, incomingSV={}, outgoingSV={}]",
+        container, recordStartOffset, numberOfRecords, incomingSv, outgoingSv);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
index fe1660f..ed7b265 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
@@ -40,7 +40,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
  * implicit column for rowId for each row.
  */
 public class PartitionLimitRecordBatch extends AbstractSingleRecordBatch<PartitionLimit> {
-  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
 
   private SelectionVector2 outgoingSv;
   private SelectionVector2 incomingSv;
@@ -250,4 +250,12 @@ public class PartitionLimitRecordBatch extends AbstractSingleRecordBatch<Partiti
     numberOfRecords = (popConfig.getLast() == null) ?
       Integer.MIN_VALUE : Math.max(0, popConfig.getLast()) - recordStartOffset;
   }
+
+  @Override
+  public void dump() {
+    logger.error("PartitionLimitRecordBatch[container={}, popConfig={}, incomingSV={}, outgoingSV={},"
+            + " recordStartOffset={}, numberOfRecords={}, partitionId={}, unionTypeEnabled={}, state={}]",
+        container, popConfig, incomingSv, outgoingSv, recordStartOffset, numberOfRecords,
+        partitionId, unionTypeEnabled, state);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 791b24a..12ee668 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.mergereceiver;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
@@ -827,4 +828,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     super.close();
   }
 
+  @Override
+  public void dump() {
+    logger.error("MergingRecordBatch[container={}, outgoingPosition={}, incomingBatches={}, batchOffsets={}, "
+            + "tempBatchHolder={}, inputCounts={}, outputCounts={}]",
+        container, outgoingPosition, Arrays.toString(incomingBatches), Arrays.toString(batchOffsets),
+        Arrays.toString(tempBatchHolder), Arrays.toString(inputCounts), Arrays.toString(outputCounts));
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index a3aa11b..63a0121 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -646,4 +646,10 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
     }
   }
 
+  @Override
+  public void dump() {
+    logger.error("OrderedPartitionRecordBatch[container={}, popConfig={}, partitionVectors={}, partitions={}, " +
+            "recordsSampled={}, recordCount={}]",
+        container, popConfig, partitionVectors, partitions, recordsSampled, recordCount);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index bbcb758..9385400 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -247,4 +247,9 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
     }
   }
 
+  @Override
+  public void dump() {
+    logger.error("ProducerConsumerBatch[container={}, recordCount={}, schema={}, stop={}]",
+        container, recordCount, schema, stop);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 4d55f00..8ea15d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -898,4 +898,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     wasNone = true;
     return IterOutcome.OK_NEW_SCHEMA;
   }
+
+  @Override
+  public void dump() {
+    logger.error("ProjectRecordBatch[projector={}, hasRemainder={}, remainderIndex={}, recordCount={}, container={}]",
+        projector, hasRemainder, remainderIndex, recordCount, container);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index 02ccd4b..2f1aa02 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -110,4 +110,10 @@ public abstract class ProjectorTemplate implements Projector {
                               @Named("outIndex") int outIndex)
                        throws SchemaChangeException;
 
+  @Override
+  public String toString() {
+    return "Projector[vector2=" + vector2
+        + ", selectionVectorMode=" + svMode
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
index 620f150..e0beab1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
@@ -55,6 +55,7 @@ public class OperatorRecordBatch implements CloseableRecordBatch {
 
   private final OperatorDriver driver;
   private final BatchAccessor batchAccessor;
+  private IterOutcome lastOutcome;
 
   public OperatorRecordBatch(FragmentContext context, PhysicalOperator config, OperatorExec opExec) {
     OperatorContext opContext = context.newOperatorContext(config);
@@ -143,7 +144,12 @@ public class OperatorRecordBatch implements CloseableRecordBatch {
   public IterOutcome next() {
     try {
       driver.operatorContext().getStats().startProcessing();
-      return driver.next();
+      lastOutcome = driver.next();
+      return lastOutcome;
+    } catch (Exception e) {
+      // mark batch as failed
+      lastOutcome = IterOutcome.STOP;
+      throw e;
     } finally {
       driver.operatorContext().getStats().stopProcessing();
     }
@@ -158,4 +164,14 @@ public class OperatorRecordBatch implements CloseableRecordBatch {
   public VectorContainer getContainer() {
     return batchAccessor.getOutgoingContainer();
   }
+
+  @Override
+  public boolean hasFailed() {
+    return lastOutcome == IterOutcome.STOP;
+  }
+
+  @Override
+  public void dump() {
+    logger.error("OperatorRecordBatch[batchAccessor={}, lastOutcome={}]", batchAccessor, lastOutcome);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index fc49d43..fcbb10e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -213,4 +213,8 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
     incoming.kill(sendUpstream);
   }
 
+  @Override
+  public void dump() {
+    logger.error("SortBatch[popConfig={}, container={}, sorter={}, schema={}]", popConfig, container, sorter, schema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
index da476cc..b3c6a7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
@@ -70,4 +70,8 @@ public abstract class SortTemplate implements Sorter, IndexedSortable{
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch outgoing);
   public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex);
 
+  @Override
+  public String toString() {
+    return "SortTemplate[vector4=" + vector4 + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 1471d5e..a8c3622 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -92,4 +92,9 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
   public WritableBatch getWritableBatch() {
     return WritableBatch.get(this);
   }
+
+  @Override
+  public void dump() {
+    logger.error("RemovingRecordBatch[container={}, state={}, copier={}]", container, state, copier);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 50cb26b..d56f848 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-/* TraceRecordBatch contains value vectors which are exactly the same
+/** TraceRecordBatch contains value vectors which are exactly the same
  * as the incoming record batch's value vectors. If the incoming
  * record batch has a selection vector (type 2) then TraceRecordBatch
  * will also contain a selection vector.
@@ -171,4 +171,8 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
     super.close();
   }
 
+  @Override
+  public void dump() {
+    logger.error("TraceRecordBatch[filename={}, logLocation={}, selectionVector={}]", getFileName(), logLocation, sv);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 7e16d6a..e83fddf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -440,4 +440,9 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
       batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
   }
 
+  @Override
+  public void dump() {
+    logger.error("UnionAllRecordBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, "
+            + "recordCount={}]", container, left, right, leftUpstream, rightUpstream, recordCount);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
index 285481f..508999f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
@@ -191,4 +191,14 @@ public class UnnestImpl implements Unnest {
       transfers = null;
     }
   }
+
+  @Override
+  public String toString() {
+    return "UnnestImpl[svMode=" + svMode
+        + ", outputLimit=" + outputLimit
+        + ", valueIndex=" + valueIndex
+        + ", innerValueIndex=" + innerValueIndex
+        + ", runningInnerValueIndex=" + runningInnerValueIndex
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 5f63967..1c8336d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -241,7 +241,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
     }
   }
 
-    @Override
+  @Override
   public VectorContainer getOutgoingContainer() {
     return this.container;
   }
@@ -446,4 +446,9 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
     super.close();
   }
 
+  @Override
+  public void dump() {
+    logger.error("UnnestRecordBatch[container={}, unnest={}, hasRemainder={}, remainderIndex={}, " +
+            "unnestFieldMetadata={}]", container, unnest, hasRemainder, remainderIndex, unnestFieldMetadata);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 8cdc0a1..a6ebef0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -64,6 +64,9 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
   private boolean first = true;
   private final UnorderedReceiver config;
   private final OperatorContext oContext;
+  // Represents last outcome of next(). If an Exception is thrown
+  // during the method's execution a value IterOutcome.STOP will be assigned.
+  private IterOutcome lastOutcome;
 
   public enum Metric implements MetricDef {
     BYTES_RECEIVED,
@@ -156,7 +159,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
   public IterOutcome next() {
     batchLoader.resetRecordCount();
     stats.startProcessing();
-    try{
+    try {
       RawFragmentBatch batch;
       try {
         stats.startWait();
@@ -174,15 +177,17 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
       first = false;
 
       if (batch == null) {
+        lastOutcome = IterOutcome.NONE;
         batchLoader.zero();
         if (!context.getExecutorState().shouldContinue()) {
-          return IterOutcome.STOP;
+          lastOutcome = IterOutcome.STOP;
         }
-        return IterOutcome.NONE;
+        return lastOutcome;
       }
 
       if (context.getAllocator().isOverLimit()) {
-        return IterOutcome.OUT_OF_MEMORY;
+        lastOutcome = IterOutcome.OUT_OF_MEMORY;
+        return lastOutcome;
       }
 
       final RecordBatchDef rbd = batch.getHeader().getDef();
@@ -195,14 +200,19 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
       if(schemaChanged) {
         this.schema = batchLoader.getSchema();
         stats.batchReceived(0, rbd.getRecordCount(), true);
-        return IterOutcome.OK_NEW_SCHEMA;
+        lastOutcome = IterOutcome.OK_NEW_SCHEMA;
       } else {
         stats.batchReceived(0, rbd.getRecordCount(), false);
-        return IterOutcome.OK;
+        lastOutcome = IterOutcome.OK;
       }
-    } catch(SchemaChangeException | IOException ex) {
+      return lastOutcome;
+    } catch (SchemaChangeException | IOException ex) {
       context.getExecutorState().fail(ex);
-      return IterOutcome.STOP;
+      lastOutcome = IterOutcome.STOP;
+      return lastOutcome;
+    } catch (Exception e) {
+      lastOutcome = IterOutcome.STOP;
+      throw e;
     } finally {
       stats.stopProcessing();
     }
@@ -270,4 +280,14 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
       }
     }
   }
+
+  @Override
+  public void dump() {
+    logger.error("UnorderedReceiverBatch[batchLoader={}, schema={}]", batchLoader, schema);
+  }
+
+  @Override
+  public boolean hasFailed() {
+    return lastOutcome == IterOutcome.STOP;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 88f4c7d..1ea3895 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -322,8 +322,7 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
       }
 
       return batchState;
-    }
-    catch (RuntimeException | Error e) {
+    } catch (RuntimeException | Error e) {
       exceptionState = e;
       logger.trace("[#{}, on {}]: incoming next() exception: ({} ->) {}",
                    instNum, batchTypeName, prevBatchState, exceptionState,
@@ -366,4 +365,14 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
 
   public RecordBatch getIncoming() { return incoming; }
 
+  @Override
+  public boolean hasFailed() {
+    return exceptionState != null || batchState == STOP;
+  }
+
+  @Override
+  public void dump() {
+    logger.error("IteratorValidatorBatchIterator[container={}, instNum={}, batchTypeName={}, lastSchema={}, "
+           + "lastNewSchema={}]", getContainer(), instNum, batchTypeName, lastSchema, lastNewSchema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
index 5cd6de7..1e477ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
@@ -295,6 +295,17 @@ public abstract class FrameSupportTemplate implements WindowFramer {
     internal.clear();
   }
 
+  @Override
+  public String toString() {
+    return "FrameSupportTemplate[internal=" + internal
+        + ", outputCount=" + outputCount
+        + ", current=" + current
+        + ", frameLastRow=" + frameLastRow
+        + ", remainingRows=" + remainingRows
+        + ", partialPartition=" + partialPartition
+        + "]";
+  }
+
   /**
    * called once for each peer row of the current frame.
    * @param index of row to aggregate
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
index 55c27c1..cc7a04d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
@@ -298,6 +298,17 @@ public abstract class NoFrameSupportTemplate implements WindowFramer {
     internal.clear();
   }
 
+  @Override
+  public String toString() {
+    return "FrameSupportTemplate[internal=" + internal
+        + ", outputCount=" + outputCount
+        + ", current=" + current
+        + ", requireFullPartition=" + requireFullPartition
+        + ", partition=" + partition
+        + "]";
+  }
+
+
   /**
    * called once for each row after we evaluate all peer rows. Used to write a value in the row
    *
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
index 7d98724..a9baea9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
@@ -106,4 +106,9 @@ public class WindowDataBatch implements VectorAccessible {
   public void clear() {
     container.clear();
   }
+
+  @Override
+  public String toString() {
+    return "WindowDataBatch[container=" + container + ", recordCount=" + recordCount + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index a372a3c..59e84ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.window;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
@@ -426,4 +427,10 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
   public int getRecordCount() {
     return framers[0].getOutputCount();
   }
+
+  @Override
+  public void dump() {
+    logger.error("WindowFrameRecordBatch[container={}, popConfig={}, framers={}, schema={}]",
+        container, popConfig, Arrays.toString(framers), schema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 0edf974..262a241 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -824,4 +824,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     incoming.kill(sendUpstream);
   }
 
+  @Override
+  public void dump() {
+    logger.error("ExternalSortBatch[schema={}, sorter={}, mSorter={}, container={}]", schema, sorter, mSorter, container);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 5ebec50..9b10d43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -206,4 +206,13 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
   public abstract int doEval(@Named("leftIndex") int leftIndex,
                              @Named("rightIndex") int rightIndex)
                       throws SchemaChangeException;
+
+  @Override
+  public String toString() {
+    return "MSortTemplate[vector4=" + vector4
+        + ", aux=" + aux
+        + ", runStarts=" + runStarts
+        + ", desiredRecordBatchCount=" + desiredRecordBatchCount
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
index de783df..57d2ec3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
@@ -83,4 +83,9 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In
   public abstract int doEval(@Named("leftIndex") char leftIndex,
                              @Named("rightIndex") char rightIndex)
                       throws SchemaChangeException;
+
+  @Override
+  public String toString() {
+    return "SinglebatchSorterTemplate[vector2=" + vector2 + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 7db4d3b..8fc4a74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -698,4 +698,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     SpilledRuns spilledRuns = new SpilledRuns(oContext, spillSet, copierHolder);
     return new SortImpl(oContext, sortConfig, spilledRuns, outputWrapperContainer);
   }
+
+  @Override
+  public void dump() {
+    logger.error("ExternalSortBatch[schema={}, sortState={}, sortConfig={}, outputWrapperContainer={}, "
+            + "outputSV4={}, container={}]",
+        schema, sortState, sortConfig, outputWrapperContainer, outputSV4, container);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
index e592ccb..fbcf1ae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
@@ -137,6 +137,15 @@ public class SortConfig {
                   mergeBatchSize, mSortBatchSize);
   }
 
+  @Override
+  public String toString() {
+    return "SortConfig[spillFileSize=" + spillFileSize
+        + ", spillBatchSize=" + spillBatchSize
+        + ", mergeBatchSize=" + mergeBatchSize
+        + ", mSortBatchSize=" + mSortBatchSize
+        + "]";
+  }
+
   public long maxMemory() { return maxMemory; }
   public int mergeLimit() { return mergeLimit; }
   public long spillFileSize() { return spillFileSize; }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
index ac30e94..79294cc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
@@ -592,4 +592,12 @@ public class SortImpl {
       throw ex;
     }
   }
+
+  @Override
+  public String toString() {
+    return "SortImpl[config=" + config
+        + ", outputBatch=" + outputBatch
+        + ", sizer=" + sizer
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index c38de2d..362ea29 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -47,6 +47,10 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   protected final boolean unionTypeEnabled;
   protected BatchState state;
 
+  // Represents last outcome of next(). If an Exception is thrown
+  // during the method's execution a value IterOutcome.STOP will be assigned.
+  private IterOutcome lastOutcome;
+
   protected AbstractRecordBatch(final T popConfig, final FragmentContext context) throws OutOfMemoryException {
     this(popConfig, context, true, context.newOperatorContext(popConfig));
   }
@@ -113,7 +117,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   }
 
   public final IterOutcome next(final int inputIndex, final RecordBatch b){
-    IterOutcome next = null;
+    IterOutcome next;
     stats.stopProcessing();
     try{
       if (!context.getExecutorState().shouldContinue()) {
@@ -132,15 +136,15 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     }
 
     switch(next) {
-    case OK_NEW_SCHEMA:
-      stats.batchReceived(inputIndex, b.getRecordCount(), true);
-      break;
-    case OK:
-    case EMIT:
-      stats.batchReceived(inputIndex, b.getRecordCount(), false);
-      break;
-    default:
-      break;
+      case OK_NEW_SCHEMA:
+        stats.batchReceived(inputIndex, b.getRecordCount(), true);
+        break;
+      case OK:
+      case EMIT:
+        stats.batchReceived(inputIndex, b.getRecordCount(), false);
+        break;
+      default:
+        break;
     }
 
     return next;
@@ -155,27 +159,38 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
           buildSchema();
           switch (state) {
             case DONE:
-              return IterOutcome.NONE;
+              lastOutcome = IterOutcome.NONE;
+              break;
             case OUT_OF_MEMORY:
               // because we don't support schema changes, it is safe to fail the query right away
               context.getExecutorState().fail(UserException.memoryError()
                 .build(logger));
               // FALL-THROUGH
             case STOP:
-              return IterOutcome.STOP;
+              lastOutcome = IterOutcome.STOP;
+              break;
             default:
               state = BatchState.FIRST;
-              return IterOutcome.OK_NEW_SCHEMA;
+              lastOutcome = IterOutcome.OK_NEW_SCHEMA;
+              break;
           }
+          break;
         }
         case DONE: {
-          return IterOutcome.NONE;
+          lastOutcome = IterOutcome.NONE;
+          break;
         }
         default:
-          return innerNext();
+          lastOutcome = innerNext();
+          break;
       }
+      return lastOutcome;
     } catch (final SchemaChangeException e) {
+      lastOutcome = IterOutcome.STOP;
       throw new DrillRuntimeException(e);
+    } catch (Exception e) {
+      lastOutcome = IterOutcome.STOP;
+      throw e;
     } finally {
       stats.stopProcessing();
     }
@@ -245,6 +260,11 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     return  container;
   }
 
+  @Override
+  public boolean hasFailed() {
+    return lastOutcome == IterOutcome.STOP;
+  }
+
   public RecordBatchStatsContext getRecordBatchStatsContext() {
     return batchStatsContext;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
index cb3a61c..dda4ef5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
@@ -59,6 +59,5 @@ public abstract class AbstractTableFunctionRecordBatch<T extends PhysicalOperato
     setIncoming(incoming.getIncoming());
     lateral = incoming;
   }
-
 }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index c65827c..7473c8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -320,4 +320,19 @@ public interface RecordBatch extends VectorAccessible {
    * buffers.
    */
   WritableBatch getWritableBatch();
+
+  /**
+   * Perform dump of this batch's state to logs.
+   */
+  void dump();
+
+  /**
+   * Use this method to see if the batch has failed. Currently used when logging {@code RecordBatch}'s
+   * state using {@link #dump()} method.
+   *
+   * @return {@code true} if either {@link org.apache.drill.exec.record.RecordBatch.IterOutcome#STOP}
+   * was returned by its or child's {@link #next()} invocation or there was an {@code Exception} thrown
+   * during execution of the batch; {@code false} otherwise
+   */
+  boolean hasFailed();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index e841f2f..696d6db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -292,4 +292,12 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
     container.clear();
     resetRecordCount();
   }
+
+  @Override
+  public String toString() {
+    return "RecordBatchLoader[container=" + container
+        + ", valueCount=" + valueCount
+        + ", schema=" + schema
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
index 47b11a6..04cf32e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
@@ -367,4 +367,18 @@ public class RecordIterator implements VectorAccessible {
     clear();
     clearInflightBatches();
   }
+
+  @Override
+  public String toString() {
+    return "RecordIterator[outerPosition=" + outerPosition
+        + ", innerPosition=" + innerPosition
+        + ", innerRecordCount=" + innerRecordCount
+        + ", totalRecordCount=" + totalRecordCount
+        + ", startBatchPosition=" + startBatchPosition
+        + ", markedInnerPosition" + markedInnerPosition
+        + ", markedOuterPosition=" + markedOuterPosition
+        + ", lastOutcome=" + lastOutcome
+        + ", inputIndex=" + inputIndex
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
index 9dfa129..e4278ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
@@ -107,4 +107,14 @@ public class SchemalessBatch implements CloseableRecordBatch {
 
   @Override
   public VectorContainer getContainer() { return null; }
+
+  @Override
+  public boolean hasFailed() {
+    return false;
+  }
+
+  @Override
+  public void dump() {
+    logger.error("SchemalessBatch[]");
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
index 4063e55..c588f25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
@@ -28,6 +28,9 @@ import java.util.Iterator;
  * Wrap a VectorContainer into a record batch.
  */
 public class SimpleRecordBatch implements RecordBatch {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleRecordBatch.class);
+
   private VectorContainer container;
   private FragmentContext context;
 
@@ -99,4 +102,14 @@ public class SimpleRecordBatch implements RecordBatch {
   public VectorContainer getContainer() {
      return container;
   }
+
+  @Override
+  public void dump() {
+    logger.error("SimpleRecordBatch[container=" + container + "]");
+  }
+
+  @Override
+  public boolean hasFailed() {
+    return false;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index a0b47ed..4f4f88d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -161,4 +161,13 @@ public class SelectionVector4 implements AutoCloseable {
   public void close() {
     clear();
   }
+
+  @Override
+  public String toString() {
+    return "SelectionVector4[data=" + data
+        + ", recordCount=" + recordCount
+        + ", start=" + start
+        + ", length=" + length
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
index 7accdc4..9314da6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
@@ -104,5 +104,4 @@ public abstract class AbstractRecordReader implements RecordReader {
   protected List<SchemaPath> getDefaultColumnsToRead() {
     return GroupScan.ALL_COLUMNS;
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
index a0dda01..edd91d1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
@@ -26,8 +26,8 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.vector.ValueVector;
 
 public interface RecordReader extends AutoCloseable {
-  public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
-  public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
+  long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024;
+  long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000;
 
   /**
    * Configure the RecordReader with the provided schema and the record batch that should be written to.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
index 68c3c36..31c0103 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
@@ -222,4 +222,9 @@ public class StorageStrategy {
       fs.deleteOnExit(path);
     }
   }
-}
\ No newline at end of file
+
+  @Override
+  public String toString() {
+    return "StorageStrategy[umask=" + umask + ", deleteOnExist=" + deleteOnExit + "]";
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
index 6945fff..7668130 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -66,7 +66,7 @@ import org.joda.time.DateTimeConstants;
 /**
  * A RecordReader implementation for Avro data files.
  *
- * @see RecordReader
+ * @see org.apache.drill.exec.store.RecordReader
  */
 public class AvroRecordReader extends AbstractRecordReader {
 
@@ -398,4 +398,17 @@ public class AvroRecordReader extends AbstractRecordReader {
       }
     }
   }
+
+  @Override
+  public String toString() {
+    long currentPosition = -1L;
+    try {
+      currentPosition = reader.tell();
+    } catch (IOException e) {
+      logger.trace("Unable to obtain reader position: " + e.getMessage());
+    }
+    return "AvroRecordReader[File=" + hadoop
+        + ", Position=" + currentPosition
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
index 5d9e105..2580010 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.expr.holders.BigIntHolder;
@@ -54,6 +53,8 @@ public class BsonRecordReader {
   private final boolean readNumbersAsDouble;
   protected DrillBuf workBuf;
   private String currentFieldName;
+  // Used for error context
+  private BsonReader reader;
 
   public BsonRecordReader(DrillBuf managedBuf, boolean allTextMode, boolean readNumbersAsDouble) {
     this(managedBuf, GroupScan.ALL_COLUMNS, readNumbersAsDouble);
@@ -67,6 +68,7 @@ public class BsonRecordReader {
   }
 
   public void write(ComplexWriter writer, BsonReader reader) throws IOException {
+    this.reader = reader;
     reader.readStartDocument();
     BsonType readBsonType = reader.getCurrentBsonType();
     switch (readBsonType) {
@@ -364,17 +366,20 @@ public class BsonRecordReader {
     }
   }
 
-  public UserException.Builder getExceptionWithContext(UserException.Builder exceptionBuilder, String field,
-      String msg, Object... args) {
-    return null;
-  }
-
-  public UserException.Builder getExceptionWithContext(Throwable exception, String field, String msg, Object... args) {
-    return null;
-  }
-
   private void ensure(final int length) {
     workBuf = workBuf.reallocIfNeeded(length);
   }
 
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("BsonRecordReader[");
+    if (reader != null) {
+      sb.append("Name=")
+          .append(reader.getCurrentName())
+          .append(", Type=")
+          .append(reader.getCurrentBsonType());
+    }
+    sb.append(']');
+    return sb.toString();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
index 7a4e4a2..379e2c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
@@ -103,4 +103,12 @@ public class EasyWriter extends AbstractWriter {
   public int getOperatorType() {
     return formatPlugin.getWriterOperatorType();
   }
+
+  @Override
+  public String toString() {
+    return "EasyWriter[location=" + location
+        + ", storageStrategy=" + getStorageStrategy()
+        + ", partitionColumns=" + partitionColumns
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 4b8bbf8..62ace66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -104,7 +104,7 @@ public class JSONRecordReader extends AbstractRecordReader {
         "One of inputPath or embeddedContent must be set but not both."
         );
 
-    if(inputPath != null) {
+    if (inputPath != null) {
       this.hadoopPath = new Path(inputPath);
     } else {
       this.embeddedContent = embeddedContent;
@@ -126,9 +126,11 @@ public class JSONRecordReader extends AbstractRecordReader {
   public String toString() {
     return super.toString()
         + "[hadoopPath = " + hadoopPath
+        + ", currentRecord=" + currentRecordNumberInFile()
+        + ", jsonReader=" + jsonReader
         + ", recordCount = " + recordCount
         + ", parseErrorCount = " + parseErrorCount
-         + ", runningRecordCount = " + runningRecordCount + ", ...]";
+        + ", runningRecordCount = " + runningRecordCount + ", ...]";
   }
 
   @Override
@@ -162,9 +164,9 @@ public class JSONRecordReader extends AbstractRecordReader {
   }
 
   private void setupParser() throws IOException {
-    if(hadoopPath != null){
+    if (hadoopPath != null) {
       jsonReader.setSource(stream);
-    }else{
+    } else {
       jsonReader.setSource(embeddedContent);
     }
     jsonReader.setIgnoreJSONParseErrors(skipMalformedJSONRecords);
@@ -182,7 +184,7 @@ public class JSONRecordReader extends AbstractRecordReader {
     }
 
     UserException.Builder exceptionBuilder = UserException.dataReadError(e)
-            .message("%s - %s", suffix, message);
+        .message("%s - %s", suffix, message);
     if (columnNr > 0) {
       exceptionBuilder.pushContext("Column ", columnNr);
     }
@@ -205,36 +207,32 @@ public class JSONRecordReader extends AbstractRecordReader {
     writer.reset();
     recordCount = 0;
     parseErrorCount = 0;
-    if(write == ReadState.JSON_RECORD_PARSE_EOF_ERROR){
+    if (write == ReadState.JSON_RECORD_PARSE_EOF_ERROR) {
       return recordCount;
     }
-    outside: while(recordCount < DEFAULT_ROWS_PER_BATCH){
-      try{
+    while (recordCount < DEFAULT_ROWS_PER_BATCH) {
+      try {
         writer.setPosition(recordCount);
         write = jsonReader.write(writer);
-        if(write == ReadState.WRITE_SUCCEED){
+        if (write == ReadState.WRITE_SUCCEED) {
           recordCount++;
-        }
-        else if(write == ReadState.JSON_RECORD_PARSE_ERROR || write == ReadState.JSON_RECORD_PARSE_EOF_ERROR){
-          if(skipMalformedJSONRecords == false){
-            handleAndRaise("Error parsing JSON", new Exception(hadoopPath.getName() + " : line nos :" + (recordCount+1)));
+        } else if (write == ReadState.JSON_RECORD_PARSE_ERROR || write == ReadState.JSON_RECORD_PARSE_EOF_ERROR) {
+          if (!skipMalformedJSONRecords) {
+            handleAndRaise("Error parsing JSON", new Exception());
           }
           ++parseErrorCount;
-          if(printSkippedMalformedJSONRecordLineNumber){
-            logger.debug("Error parsing JSON in " + hadoopPath.getName() + " : line nos :" + (recordCount+parseErrorCount));
+          if (printSkippedMalformedJSONRecordLineNumber) {
+            logger.debug("Error parsing JSON in " + hadoopPath.getName() + " : line nos :" + (recordCount + parseErrorCount));
           }
-          if(write == ReadState.JSON_RECORD_PARSE_EOF_ERROR){
-            break outside;
+          if (write == ReadState.JSON_RECORD_PARSE_EOF_ERROR) {
+            break;
           }
+        } else {
+          break;
         }
-        else{
-          break outside;
-        }
+      } catch (IOException ex) {
+        handleAndRaise("Error parsing JSON", ex);
       }
-      catch(IOException ex)
-        {
-           handleAndRaise("Error parsing JSON", ex);
-        }
     }
     // Skip empty json file with 0 row.
     // Only when data source has > 0 row, ensure the batch has one field.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
index fba80e5..adab033 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
@@ -27,7 +27,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 
 public interface JsonProcessor {
 
-  public static enum ReadState {
+  enum ReadState {
     END_OF_STREAM,
     JSON_RECORD_PARSE_ERROR,
     JSON_RECORD_PARSE_EOF_ERROR,
@@ -41,17 +41,11 @@ public interface JsonProcessor {
 
   void ensureAtLeastOneField(BaseWriter.ComplexWriter writer);
 
-  public UserException.Builder getExceptionWithContext(UserException.Builder exceptionBuilder,
-                                                       String field,
-                                                       String msg,
-                                                       Object... args);
+  UserException.Builder getExceptionWithContext(UserException.Builder exceptionBuilder, String message);
 
-  public UserException.Builder getExceptionWithContext(Throwable exception,
-                                                       String field,
-                                                       String msg,
-                                                       Object... args);
+  UserException.Builder getExceptionWithContext(Throwable exception, String message);
 
-  public boolean ignoreJSONParseError();
+  boolean ignoreJSONParseError();
 
-  public void setIgnoreJSONParseErrors(boolean ignoreJSONParseErrors);
+  void setIgnoreJSONParseErrors(boolean ignoreJSONParseErrors);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
index aaa74ae..48a1464 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
 
 import org.apache.drill.exec.store.easy.json.JsonProcessor;
 
+import com.fasterxml.jackson.core.JsonLocation;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -49,8 +50,12 @@ public abstract class BaseJsonProcessor implements JsonProcessor {
 
   protected JsonParser parser;
   protected DrillBuf workBuf;
-  protected JsonToken lastSeenJsonToken = null;
-  boolean ignoreJSONParseErrors = false; // default False
+  protected JsonToken lastSeenJsonToken;
+  boolean ignoreJSONParseErrors;
+  /**
+   * The name of the current field being parsed. For Error messages.
+   */
+  protected String currentFieldName;
 
   /**
    *
@@ -90,26 +95,31 @@ public abstract class BaseJsonProcessor implements JsonProcessor {
   }
 
   @Override
-  public UserException.Builder getExceptionWithContext(
-      UserException.Builder exceptionBuilder, String field, String msg,
-      Object... args) {
-    if (msg != null) {
-      exceptionBuilder.message(msg, args);
-    }
-    if (field != null) {
-      exceptionBuilder.pushContext("Field ", field);
+  public String toString() {
+    JsonLocation location = parser.getCurrentLocation();
+    return getClass().getSimpleName() + "[Line=" + location.getLineNr()
+        + ", Column=" + (location.getColumnNr() + 1)
+        + ", Field=" + getCurrentField()
+        + "]";
+  }
+
+  @Override
+  public UserException.Builder getExceptionWithContext(UserException.Builder builder, String message) {
+    builder.message(message);
+    JsonLocation location = parser.getCurrentLocation();
+    builder.addContext("Line", location.getLineNr())
+        .addContext("Column", location.getColumnNr() + 1);
+    String fieldName = getCurrentField();
+    if (fieldName != null) {
+      builder.addContext("Field", fieldName);
     }
-    exceptionBuilder.pushContext("Column ",
-        parser.getCurrentLocation().getColumnNr() + 1).pushContext("Line ",
-        parser.getCurrentLocation().getLineNr());
-    return exceptionBuilder;
+    return builder;
   }
 
   @Override
-  public UserException.Builder getExceptionWithContext(Throwable e,
-      String field, String msg, Object... args) {
+  public UserException.Builder getExceptionWithContext(Throwable e, String message) {
     UserException.Builder exceptionBuilder = UserException.dataReadError(e);
-    return getExceptionWithContext(exceptionBuilder, field, msg, args);
+    return getExceptionWithContext(exceptionBuilder, message);
   }
 
   /*
@@ -138,4 +148,8 @@ public abstract class BaseJsonProcessor implements JsonProcessor {
     }
     return JsonExceptionProcessingState.PROC_SUCCEED;
   }
+
+  protected String getCurrentField() {
+    return currentFieldName;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
index a5e9f1a..0f92ec5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
@@ -23,8 +23,6 @@ import com.fasterxml.jackson.core.JsonToken;
 
 import io.netty.buffer.DrillBuf;
 
-import org.apache.drill.exec.store.easy.json.JsonProcessor.ReadState;
-import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor.JsonExceptionProcessingState;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter;
 
 public class CountingJsonReader extends BaseJsonProcessor {
@@ -41,18 +39,14 @@ public class CountingJsonReader extends BaseJsonProcessor {
         token = parser.nextToken();
       }
       lastSeenJsonToken = null;
+      if (token == JsonToken.FIELD_NAME) {
+        currentFieldName = parser.getText();
+      }
       if (!parser.hasCurrentToken()) {
         return ReadState.END_OF_STREAM;
       } else if (token != JsonToken.START_OBJECT) {
         throw new com.fasterxml.jackson.core.JsonParseException(
-            parser,
-            String
-                .format(
-                    "Cannot read from the middle of a record. Current token was %s ",
-                    token));
-        // throw new
-        // IllegalStateException(String.format("Cannot read from the middle of a record. Current token was %s",
-        // token));
+            parser, String.format("Cannot read from the middle of a record. Current token was %s ", token));
       }
       writer.rootAsMap().bit("count").writeBit(1);
       parser.skipChildren();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
index 3d88b1a..549df82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
@@ -163,4 +163,19 @@ public class SequenceFileRecordReader extends AbstractRecordReader {
       logger.warn("Exception closing reader: {}", e);
     }
   }
-}
\ No newline at end of file
+
+  @Override
+  public String toString() {
+    long position = -1L;
+    try {
+      if (reader != null) {
+        position = reader.getPos();
+      }
+    } catch (IOException e) {
+      logger.trace("Unable to obtain reader position: " + e.getMessage());
+    }
+    return "SequenceFileRecordReader[File=" + split.getPath()
+        + ", Position=" + position
+        + "]";
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
index 9a1d486..7aa9b04 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
@@ -216,7 +216,7 @@ public class CompliantTextRecordReader extends AbstractRecordReader {
     } catch (IOException | TextParsingException e) {
       throw UserException.dataReadError(e)
           .addContext("Failure while reading file %s. Happened at or shortly before byte position %d.",
-            split.getPath(), reader.getPos())
+              split.getPath(), reader.getPos())
           .build(logger);
     }
   }
@@ -248,4 +248,11 @@ public class CompliantTextRecordReader extends AbstractRecordReader {
       logger.warn("Exception while closing stream.", e);
     }
   }
+
+  @Override
+  public String toString() {
+    return "CompliantTextRecordReader[File=" + split.getPath()
+        + ", reader=" + reader
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
index a181d42..7a9ed46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
@@ -500,4 +500,12 @@ final class TextReader {
     input.close();
   }
 
+  @Override
+  public String toString() {
+    return "TextReader[Line=" + context.currentLine()
+        + ", Column=" + context.currentChar()
+        + ", Record=" + context.currentRecord()
+        + ", Byte pos=" + getPos()
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
index 18437df..f43bb88 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
@@ -242,6 +242,14 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatPlugin.
       }
     }
 
+    @Override
+    public String toString() {
+      return "HttpdLogRecordReader[Path=" + work.getPath()
+          + ", Start=" + work.getStart()
+          + ", Length=" + work.getLength()
+          + ", Line=" + lineNumber.get()
+          + "]";
+    }
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java
index 91f8b99..2a4b4fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java
@@ -261,7 +261,7 @@ public class ImageRecordReader extends AbstractRecordReader {
   }
 
   private void processXmpDirectory(final MapWriter writer, final XmpDirectory directory) {
-    HashSet<String> listItems = new HashSet();
+    HashSet<String> listItems = new HashSet<>();
     XMPMeta xmpMeta = directory.getXMPMeta();
     if (xmpMeta != null) {
       try {
@@ -490,4 +490,9 @@ public class ImageRecordReader extends AbstractRecordReader {
       metadataStream.close();
     }
   }
+
+  @Override
+  public String toString() {
+    return "ImageRecordReader[Path=" + hadoopPath.toUri().getPath() + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
index 56bc1cc..e5d1dc4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
@@ -761,4 +761,11 @@ public class LogRecordReader extends AbstractRecordReader {
       reader = null;
     }
   }
-}
\ No newline at end of file
+
+  @Override
+  public String toString() {
+    return "LogRecordReader[File=" + fileWork.getPath()
+        + ", Line=" + rowIndex
+        + "]";
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
index 1cd393b..aea3218 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
@@ -121,4 +121,11 @@ public class ParquetWriter extends AbstractWriter {
     return CoreOperatorType.PARQUET_WRITER_VALUE;
   }
 
+  @Override
+  public String toString() {
+    return "ParquetWriter[location=" + location
+        + ", storageStrategy=" + getStorageStrategy()
+        + ", partitionColumns=" + partitionColumns
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 2ada17d..17cf8c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -348,4 +348,14 @@ public class ParquetRecordReader extends AbstractRecordReader {
       return (int) Math.min(numRecordsToRead, footer.getBlocks().get(rowGroupIndex).getRowCount());
     }
   }
+
+  @Override
+  public String toString() {
+    return "ParquetRecordReader[File=" + hadoopPath.toUri()
+        + ", Row group index=" + rowGroupIndex
+        + ", Records in row group=" + footer.getBlocks().get(rowGroupIndex).getRowCount()
+        + ", Total records read=" + (readState != null ? readState.recordsRead() : -1)
+        + ", Metadata" + footer
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 0c69d6d..7108ca6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -344,4 +344,9 @@ public class DrillParquetReader extends AbstractRecordReader {
       this.type = type;
     }
   }
+
+  @Override
+  public String toString() {
+    return "DrillParquetReader[pageReadStore=" + pageReadStore + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
index 794687f..d688f3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
@@ -396,4 +396,9 @@ public class PcapRecordReader extends AbstractRecordReader {
           .setSafe(count, value, 0, value.remaining());
     }
   }
+
+  @Override
+  public String toString() {
+    return "PcapRecordReader[File=" + pathToFile.toUri() + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index d2cd9a8..0db17fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -232,4 +232,13 @@ public class DrillTextRecordReader extends AbstractRecordReader {
       logger.warn("Exception closing reader: {}", e);
     }
   }
+
+  @Override
+  public String toString() {
+    return "DrillTextRecordReader[File=" + split.getPath()
+        + ", Record=" + (totalRecordsRead + 1)
+        + ", Start=" + split.getStart()
+        + ", Length=" + split.getLength()
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 2810c04..aaa9806 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -70,10 +70,6 @@ public class JsonReader extends BaseJsonProcessor {
    * outer list.
    */
   private boolean inOuterList;
-  /**
-   * The name of the current field being parsed. For Error messages.
-   */
-  private String currentFieldName;
 
   private FieldSelection selection;
 
@@ -214,10 +210,9 @@ public class JsonReader extends BaseJsonProcessor {
       case WRITE_SUCCEED:
         break;
       default:
-        throw getExceptionWithContext(UserException.dataReadError(),
-            currentFieldName, null).message(
-            "Failure while reading JSON. (Got an invalid read state %s )",
-            readState.toString()).build(logger);
+        throw getExceptionWithContext(UserException.dataReadError(), null).message(
+            "Failure while reading JSON. (Got an invalid read state %s )", readState.toString())
+            .build(logger);
       }
     } catch (com.fasterxml.jackson.core.JsonParseException ex) {
       if (ignoreJSONParseError()) {
@@ -236,13 +231,10 @@ public class JsonReader extends BaseJsonProcessor {
   private void confirmLast() throws IOException {
     parser.nextToken();
     if (!parser.isClosed()) {
-      throw getExceptionWithContext(UserException.dataReadError(),
-          currentFieldName, null)
-          .message(
-              "Drill attempted to unwrap a toplevel list "
-                  + "in your document.  However, it appears that there is trailing content after this top level list.  Drill only "
-                  + "supports querying a set of distinct maps or a single json array with multiple inner maps.")
-          .build(logger);
+      String message = "Drill attempted to unwrap a toplevel list in your document. "
+          + "However, it appears that there is trailing content after this top level list.  Drill only "
+          + "supports querying a set of distinct maps or a single json array with multiple inner maps.";
+      throw getExceptionWithContext(UserException.dataReadError(), message).build(logger);
     }
   }
 
@@ -255,11 +247,9 @@ public class JsonReader extends BaseJsonProcessor {
       break;
     case START_ARRAY:
       if (inOuterList) {
-        throw getExceptionWithContext(UserException.dataReadError(),
-            currentFieldName, null)
-            .message(
-                "The top level of your document must either be a single array of maps or a set "
-                    + "of white space delimited maps.").build(logger);
+        String message = "The top level of your document must either be a single array of maps or a set "
+            + "of white space delimited maps.";
+        throw getExceptionWithContext(UserException.dataReadError(), message).build(logger);
       }
 
       if (skipOuterList) {
@@ -268,11 +258,9 @@ public class JsonReader extends BaseJsonProcessor {
           inOuterList = true;
           writeDataSwitch(writer.rootAsMap());
         } else {
-          throw getExceptionWithContext(UserException.dataReadError(),
-              currentFieldName, null)
-              .message(
-                  "The top level of your document must either be a single array of maps or a set "
-                      + "of white space delimited maps.").build(logger);
+          String message = "The top level of your document must either be a single array of maps or a set "
+              + "of white space delimited maps.";
+          throw getExceptionWithContext(UserException.dataReadError(), message).build(logger);
         }
 
       } else {
@@ -285,17 +273,14 @@ public class JsonReader extends BaseJsonProcessor {
         confirmLast();
         return ReadState.END_OF_STREAM;
       } else {
-        throw getExceptionWithContext(UserException.dataReadError(),
-            currentFieldName, null).message(
-            "Failure while parsing JSON.  Ran across unexpected %s.",
-            JsonToken.END_ARRAY).build(logger);
+        throw getExceptionWithContext(UserException.dataReadError(), null).message(
+            "Failure while parsing JSON.  Ran across unexpected %s.", JsonToken.END_ARRAY).build(logger);
       }
 
     case NOT_AVAILABLE:
       return ReadState.END_OF_STREAM;
     default:
-      throw getExceptionWithContext(UserException.dataReadError(),
-          currentFieldName, null)
+      throw getExceptionWithContext(UserException.dataReadError(), null)
           .message(
               "Failure while parsing JSON.  Found token of [%s].  Drill currently only supports parsing "
                   + "json strings that contain either lists or maps.  The root object cannot be a scalar.",
@@ -412,9 +397,9 @@ public class JsonReader extends BaseJsonProcessor {
           break;
 
         default:
-          throw getExceptionWithContext(UserException.dataReadError(),
-              currentFieldName, null).message("Unexpected token %s",
-              parser.getCurrentToken()).build(logger);
+          throw getExceptionWithContext(UserException.dataReadError(), null)
+              .message("Unexpected token %s", parser.getCurrentToken())
+              .build(logger);
         }
 
       }
@@ -478,8 +463,7 @@ public class JsonReader extends BaseJsonProcessor {
         break;
 
       default:
-        throw getExceptionWithContext(UserException.dataReadError(),
-            currentFieldName, null).message("Unexpected token %s",
+        throw getExceptionWithContext(UserException.dataReadError(), null).message("Unexpected token %s",
             parser.getCurrentToken()).build(logger);
       }
     }
@@ -591,8 +575,7 @@ public class JsonReader extends BaseJsonProcessor {
               .build(logger);
         }
       } catch (Exception e) {
-        throw getExceptionWithContext(e, this.currentFieldName, null).build(
-            logger);
+        throw getExceptionWithContext(e, null).build(logger);
       }
     }
     list.endList();
@@ -637,8 +620,7 @@ public class JsonReader extends BaseJsonProcessor {
         handleString(parser, list);
         break;
       default:
-        throw getExceptionWithContext(UserException.dataReadError(),
-            currentFieldName, null).message("Unexpected token %s",
+        throw getExceptionWithContext(UserException.dataReadError(), null).message("Unexpected token %s",
             parser.getCurrentToken()).build(logger);
       }
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 7cb07eb..a9e9e62 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -207,6 +207,10 @@ public class FragmentExecutor implements Runnable {
   }
 
   private void cleanup(FragmentState state) {
+    if (fragmentState.get() == FragmentState.FAILED) {
+      root.dumpBatches();
+    }
+
     closeOutResources();
 
     updateState(state);
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
index 89731ff..4bb1a22 100644
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -269,4 +269,9 @@ public class ColumnChunkIncReadStore implements PageReadStore {
   public long getRowCount() {
     return rowCount;
   }
+
+  @Override
+  public String toString() {
+    return "ColumnChunkIncReadStore[File=" + path.toUri() + "]";
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestOperatorDump.java b/exec/java-exec/src/test/java/org/apache/drill/TestOperatorDump.java
new file mode 100644
index 0000000..18ba61c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestOperatorDump.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.ConsoleAppender;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch;
+import org.apache.drill.exec.testing.Controls;
+import org.apache.drill.exec.testing.ControlsInjectionUtil;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.LogFixture;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestOperatorDump extends ClusterTest {
+
+  private static final String ENTRY_DUMP_COMPLETED = "Batch dump completed";
+  private static final String ENTRY_DUMP_STARTED = "Batch dump started";
+
+  private LogFixture logFixture;
+  private EventAwareContextAppender appender;
+
+  @BeforeClass
+  public static void setupFiles() {
+    dirTestWatcher.copyResourceToRoot(Paths.get("multilevel"));
+  }
+
+  @Before
+  public void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    appender = new EventAwareContextAppender();
+    logFixture = LogFixture.builder()
+        .toConsole(appender, LogFixture.DEFAULT_CONSOLE_FORMAT)
+        .build();
+    startCluster(builder);
+  }
+
+  @After
+  public void tearDown(){
+    logFixture.close();
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testScanBatchChecked() throws Exception {
+    String exceptionDesc = "next-allocate";
+    final String controls = Controls.newBuilder()
+        .addException(ScanBatch.class, exceptionDesc, OutOfMemoryException.class, 0, 1)
+        .build();
+    ControlsInjectionUtil.setControls(client.client(), controls);
+    String query = "select * from dfs.`multilevel/parquet` limit 100";
+    try {
+      client.queryBuilder().sql(query).run();
+    } catch (UserRemoteException e) {
+      assertTrue(e.getMessage().contains(exceptionDesc));
+
+      String[] expectedEntries = new String[] {ENTRY_DUMP_STARTED, ENTRY_DUMP_COMPLETED};
+      validateContainsEntries(expectedEntries, ScanBatch.class.getName());
+      throw e;
+    }
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testExternalSortUnchecked() throws Exception {
+    Class<?> siteClass = org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch.class;
+    final String controls = Controls.newBuilder()
+        .addException(siteClass, ExternalSortBatch.INTERRUPTION_AFTER_SORT, RuntimeException.class)
+        .build();
+    ControlsInjectionUtil.setControls(client.client(), controls);
+    String query = "select n_name from cp.`tpch/lineitem.parquet` order by n_name";
+    try {
+      client.queryBuilder().sql(query).run();
+    } catch (UserRemoteException e) {
+      assertTrue(e.getMessage().contains(ExternalSortBatch.INTERRUPTION_AFTER_SORT));
+
+      String[] expectedEntries = new String[] {ENTRY_DUMP_STARTED, ENTRY_DUMP_COMPLETED};
+      validateContainsEntries(expectedEntries, ExternalSortBatch.class.getName());
+      throw e;
+    }
+  }
+
+  private void validateContainsEntries(String[] entries, String expectedClassName) {
+    if (entries == null) {
+      entries = new String[0];
+    }
+    List<String> messages = appender.getMessages();
+    List<String> entryList = new ArrayList<>(entries.length);
+    Collections.addAll(entryList, entries);
+    Iterator<String> it = entryList.iterator();
+    while (it.hasNext()) {
+      String entry = it.next();
+      for (String message : messages) {
+        if (message.contains(entry)) {
+          it.remove();
+          break;
+        }
+      }
+    }
+    assertTrue(String.format("Entries %s were not found in logs.", entryList), entryList.isEmpty());
+
+    Set<String> loggerNames = appender.getLoggerNames();
+    assertTrue(String.format("Entry for class %s was not found", expectedClassName),
+        loggerNames.contains(expectedClassName));
+  }
+
+  // ConsoleAppender which stores logged events
+  private static class EventAwareContextAppender extends ConsoleAppender<ILoggingEvent> {
+
+    private List<ILoggingEvent> events = new ArrayList<>();
+
+    @Override
+    protected void append(ILoggingEvent e) {
+      events.add(e);
+    }
+
+    List<String> getMessages() {
+      return events.stream()
+          .map(ILoggingEvent::getMessage)
+          .collect(Collectors.toList());
+    }
+
+    Set<String> getLoggerNames() {
+      return events.stream()
+          .map(ILoggingEvent::getLoggerName)
+          .collect(Collectors.toSet());
+    }
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 34d735e..94e0c0e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -288,6 +288,15 @@ public class MockRecordBatch implements CloseableRecordBatch {
     this.limitWithUnnest = limitWithUnnest;
   }
 
+  @Override
+  public boolean hasFailed() {
+    return false;
+  }
+
+  @Override
+  public void dump() {
+  }
+
   public static class Builder {
     private final List<RowSet> rowSets = new ArrayList<>();
     private final List<IterOutcome> iterOutcomes = new ArrayList<>();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index 6d5b666..a176646 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -106,6 +106,11 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector> {
   }
 
   @Override
+  public void dumpBatches() {
+    screenRoot.dumpBatches();
+  }
+
+  @Override
   public void close() throws Exception {
     screenRoot.close();
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
index c7105f9..aefa28a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
@@ -162,6 +162,15 @@ public class MockLateralJoinBatch implements LateralContract, CloseableRecordBat
 
   }
 
+  @Override
+  public boolean hasFailed() {
+    return false;
+  }
+
+  @Override
+  public void dump() {
+  }
+
   @Override public int getRecordCount() {
     return 0;
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
index 6f1e6e0..c05cdfd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
@@ -121,6 +121,15 @@ public class BloomFilterTest {
     public Iterator<VectorWrapper<?>> iterator() {
       return null;
     }
+
+    @Override
+    public void dump() {
+    }
+
+    @Override
+    public boolean hasFailed() {
+      return false;
+    }
   }
 
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
index b54b0b0..b62a188 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
@@ -86,6 +86,7 @@ public class LogFixture implements AutoCloseable {
     private String consoleFormat = DEFAULT_CONSOLE_FORMAT;
     private boolean logToConsole;
     private List<LogSpec> loggers = new ArrayList<>();
+    private ConsoleAppender<ILoggingEvent> appender;
 
     /**
      * Send all enabled logging to the console (if not already configured.) Some
@@ -102,6 +103,11 @@ public class LogFixture implements AutoCloseable {
       return this;
     }
 
+    public LogFixtureBuilder toConsole(ConsoleAppender<ILoggingEvent> appender, String format) {
+      this.appender = appender;
+      return toConsole(format);
+    }
+
     /**
      * Send logging to the console using the defined format.
      *
@@ -195,7 +201,7 @@ public class LogFixture implements AutoCloseable {
 
   private void setupConsole(LogFixtureBuilder builder) {
     drillLogger = (Logger)LoggerFactory.getLogger(DRILL_PACKAGE_NAME);
-    if (drillLogger.getAppender("STDOUT") != null) {
+    if (builder.appender == null && drillLogger.getAppender("STDOUT") != null) {
       return;
     }
     LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
@@ -204,10 +210,10 @@ public class LogFixture implements AutoCloseable {
     ple.setContext(lc);
     ple.start();
 
-    appender = new ConsoleAppender<>( );
+    appender = builder.appender == null ? new ConsoleAppender<>() : builder.appender;
     appender.setContext(lc);
     appender.setName("Console");
-    appender.setEncoder( ple );
+    appender.setEncoder(ple);
     appender.start();
 
     Logger root = (Logger)LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);