You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2018/10/01 12:35:38 UTC

[drill] branch master updated (3bc3b66 -> 3b1ae15)

This is an automated email from the ASF dual-hosted git repository.

vitalii pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git.


    from 3bc3b66  DRILL-5782: Web UI: do not attempt to build visualized plan when plan is absent
     new 98e5de3  DRILL-6724: Dump operator context to logs when error occurs during query execution
     new 3b1ae15  DRILL-6755: Avoid building Hash Table for inner/left join when probe side is empty

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../drill/common/exceptions/UserException.java     |   4 +-
 .../common/exceptions/UserExceptionContext.java    |  19 ++-
 .../store/mapr/db/json/MaprDBJsonRecordReader.java |  17 ++-
 .../drill/exec/store/hbase/HBaseRecordReader.java  |   5 +
 .../store/hive/readers/HiveAbstractReader.java     |  15 ++
 .../drill/exec/store/jdbc/JdbcRecordReader.java    |   8 +-
 .../drill/exec/store/kafka/KafkaRecordReader.java  |  26 +++-
 .../store/kafka/decoders/JsonMessageReader.java    |   4 +
 .../drill/exec/store/kudu/KuduRecordReader.java    |  11 ++
 .../apache/drill/exec/store/kudu/KuduWriter.java   |   5 +
 .../drill/exec/store/mongo/MongoRecordReader.java  |   5 +
 .../drill/exec/physical/config/HashAggregate.java  |   8 ++
 .../physical/config/OrderedPartitionSender.java    |  11 ++
 .../drill/exec/physical/config/PartitionLimit.java |   5 +
 .../apache/drill/exec/physical/config/Sort.java    |   7 +
 .../apache/drill/exec/physical/config/TopN.java    |   7 +
 .../drill/exec/physical/config/WindowPOP.java      |  16 +++
 .../drill/exec/physical/impl/BaseRootExec.java     |  23 +++
 .../apache/drill/exec/physical/impl/RootExec.java  |   6 +
 .../apache/drill/exec/physical/impl/ScanBatch.java |  30 +++-
 .../drill/exec/physical/impl/TopN/TopNBatch.java   |   9 +-
 .../exec/physical/impl/WriterRecordBatch.java      |   6 +
 .../exec/physical/impl/aggregate/HashAggBatch.java |   9 ++
 .../physical/impl/aggregate/HashAggTemplate.java   |   9 ++
 .../impl/aggregate/SpilledRecordbatch.java         |  35 ++++-
 .../physical/impl/aggregate/StreamingAggBatch.java |   6 +
 .../impl/aggregate/StreamingAggTemplate.java       |  10 ++
 .../physical/impl/filter/FilterRecordBatch.java    |   8 ++
 .../exec/physical/impl/filter/FilterTemplate2.java |   7 +
 .../exec/physical/impl/filter/FilterTemplate4.java |   6 +
 .../impl/filter/RuntimeFilterRecordBatch.java      |   9 +-
 .../physical/impl/flatten/FlattenRecordBatch.java  |   6 +
 .../physical/impl/flatten/FlattenTemplate.java     |  11 ++
 .../exec/physical/impl/join/HashJoinBatch.java     |  66 ++++++---
 .../physical/impl/join/HashJoinProbeTemplate.java  |  13 ++
 .../exec/physical/impl/join/LateralJoinBatch.java  |   8 ++
 .../exec/physical/impl/join/MergeJoinBatch.java    |   7 +
 .../physical/impl/join/NestedLoopJoinBatch.java    |   8 ++
 .../exec/physical/impl/limit/LimitRecordBatch.java |  74 +++++-----
 .../impl/limit/PartitionLimitRecordBatch.java      |  10 +-
 .../impl/mergereceiver/MergingRecordBatch.java     |   8 ++
 .../OrderedPartitionRecordBatch.java               |   6 +
 .../impl/producer/ProducerConsumerBatch.java       |   5 +
 .../physical/impl/project/ProjectRecordBatch.java  |   6 +
 .../physical/impl/project/ProjectorTemplate.java   |   6 +
 .../impl/protocol/OperatorRecordBatch.java         |  18 ++-
 .../drill/exec/physical/impl/sort/SortBatch.java   |   4 +
 .../exec/physical/impl/sort/SortTemplate.java      |   4 +
 .../impl/svremover/RemovingRecordBatch.java        |   5 +
 .../exec/physical/impl/trace/TraceRecordBatch.java |   6 +-
 .../physical/impl/union/UnionAllRecordBatch.java   |   5 +
 .../exec/physical/impl/unnest/UnnestImpl.java      |  10 ++
 .../physical/impl/unnest/UnnestRecordBatch.java    |   7 +-
 .../unorderedreceiver/UnorderedReceiverBatch.java  |  36 +++--
 .../validate/IteratorValidatorBatchIterator.java   |  13 +-
 .../physical/impl/window/FrameSupportTemplate.java |  11 ++
 .../impl/window/NoFrameSupportTemplate.java        |  11 ++
 .../exec/physical/impl/window/WindowDataBatch.java |   5 +
 .../impl/window/WindowFrameRecordBatch.java        |   7 +
 .../physical/impl/xsort/ExternalSortBatch.java     |   4 +
 .../exec/physical/impl/xsort/MSortTemplate.java    |   9 ++
 .../impl/xsort/SingleBatchSorterTemplate.java      |   5 +
 .../impl/xsort/managed/ExternalSortBatch.java      |   7 +
 .../physical/impl/xsort/managed/SortConfig.java    |   9 ++
 .../exec/physical/impl/xsort/managed/SortImpl.java |   8 ++
 .../exec/record/AbstractBinaryRecordBatch.java     |   2 +-
 .../drill/exec/record/AbstractRecordBatch.java     |  50 +++++--
 .../record/AbstractTableFunctionRecordBatch.java   |   1 -
 .../org/apache/drill/exec/record/RecordBatch.java  |  15 ++
 .../drill/exec/record/RecordBatchLoader.java       |   8 ++
 .../apache/drill/exec/record/RecordIterator.java   |  14 ++
 .../apache/drill/exec/record/SchemalessBatch.java  |  10 ++
 .../drill/exec/record/SimpleRecordBatch.java       |  13 ++
 .../exec/record/selection/SelectionVector4.java    |   9 ++
 .../drill/exec/store/AbstractRecordReader.java     |   1 -
 .../org/apache/drill/exec/store/RecordReader.java  |   4 +-
 .../apache/drill/exec/store/StorageStrategy.java   |   7 +-
 .../drill/exec/store/avro/AvroRecordReader.java    |  15 +-
 .../drill/exec/store/bson/BsonRecordReader.java    |  25 ++--
 .../drill/exec/store/dfs/easy/EasyWriter.java      |   8 ++
 .../exec/store/easy/json/JSONRecordReader.java     |  46 +++---
 .../drill/exec/store/easy/json/JsonProcessor.java  |  16 +--
 .../store/easy/json/reader/BaseJsonProcessor.java  |  48 ++++---
 .../store/easy/json/reader/CountingJsonReader.java |  14 +-
 .../sequencefile/SequenceFileRecordReader.java     |  17 ++-
 .../text/compliant/CompliantTextRecordReader.java  |   9 +-
 .../exec/store/easy/text/compliant/TextReader.java |   8 ++
 .../exec/store/httpd/HttpdLogFormatPlugin.java     |   8 ++
 .../drill/exec/store/image/ImageRecordReader.java  |   7 +-
 .../drill/exec/store/log/LogRecordReader.java      |   9 +-
 .../drill/exec/store/parquet/ParquetWriter.java    |   7 +
 .../parquet/columnreaders/ParquetRecordReader.java |  10 ++
 .../exec/store/parquet2/DrillParquetReader.java    |   5 +
 .../drill/exec/store/pcap/PcapRecordReader.java    |   5 +
 .../exec/store/text/DrillTextRecordReader.java     |   9 ++
 .../drill/exec/vector/complex/fn/JsonReader.java   |  62 +++-----
 .../drill/exec/work/batch/BaseRawBatchBuffer.java  |   2 +-
 .../drill/exec/work/fragment/FragmentExecutor.java |   4 +
 .../parquet/hadoop/ColumnChunkIncReadStore.java    |   5 +
 .../java/org/apache/drill/TestOperatorDump.java    | 159 +++++++++++++++++++++
 .../drill/exec/physical/impl/MockRecordBatch.java  |   9 ++
 .../drill/exec/physical/impl/SimpleRootExec.java   |   5 +
 .../physical/impl/join/TestHashJoinOutcome.java    |  43 ++++++
 .../physical/impl/unnest/MockLateralJoinBatch.java |   9 ++
 .../drill/exec/work/filter/BloomFilterTest.java    |   9 ++
 .../java/org/apache/drill/test/LogFixture.java     |  12 +-
 106 files changed, 1249 insertions(+), 234 deletions(-)
 create mode 100644 exec/java-exec/src/test/java/org/apache/drill/TestOperatorDump.java


[drill] 01/02: DRILL-6724: Dump operator context to logs when error occurs during query execution

Posted by vi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 98e5de3b5af862779244bac8329852b3c9a901df
Author: Bohdan Kazydub <bo...@gmail.com>
AuthorDate: Fri Aug 31 18:20:34 2018 +0300

    DRILL-6724: Dump operator context to logs when error occurs during query execution
    
    closes #1455
---
 .../drill/common/exceptions/UserException.java     |   4 +-
 .../common/exceptions/UserExceptionContext.java    |  19 ++-
 .../store/mapr/db/json/MaprDBJsonRecordReader.java |  17 ++-
 .../drill/exec/store/hbase/HBaseRecordReader.java  |   5 +
 .../store/hive/readers/HiveAbstractReader.java     |  15 ++
 .../drill/exec/store/jdbc/JdbcRecordReader.java    |   8 +-
 .../drill/exec/store/kafka/KafkaRecordReader.java  |  26 +++-
 .../store/kafka/decoders/JsonMessageReader.java    |   4 +
 .../drill/exec/store/kudu/KuduRecordReader.java    |  11 ++
 .../apache/drill/exec/store/kudu/KuduWriter.java   |   5 +
 .../drill/exec/store/mongo/MongoRecordReader.java  |   5 +
 .../drill/exec/physical/config/HashAggregate.java  |   8 ++
 .../physical/config/OrderedPartitionSender.java    |  11 ++
 .../drill/exec/physical/config/PartitionLimit.java |   5 +
 .../apache/drill/exec/physical/config/Sort.java    |   7 +
 .../apache/drill/exec/physical/config/TopN.java    |   7 +
 .../drill/exec/physical/config/WindowPOP.java      |  16 +++
 .../drill/exec/physical/impl/BaseRootExec.java     |  23 +++
 .../apache/drill/exec/physical/impl/RootExec.java  |   6 +
 .../apache/drill/exec/physical/impl/ScanBatch.java |  30 +++-
 .../drill/exec/physical/impl/TopN/TopNBatch.java   |   9 +-
 .../exec/physical/impl/WriterRecordBatch.java      |   6 +
 .../exec/physical/impl/aggregate/HashAggBatch.java |   9 ++
 .../physical/impl/aggregate/HashAggTemplate.java   |   9 ++
 .../impl/aggregate/SpilledRecordbatch.java         |  35 ++++-
 .../physical/impl/aggregate/StreamingAggBatch.java |   6 +
 .../impl/aggregate/StreamingAggTemplate.java       |  10 ++
 .../physical/impl/filter/FilterRecordBatch.java    |   8 ++
 .../exec/physical/impl/filter/FilterTemplate2.java |   7 +
 .../exec/physical/impl/filter/FilterTemplate4.java |   6 +
 .../impl/filter/RuntimeFilterRecordBatch.java      |   9 +-
 .../physical/impl/flatten/FlattenRecordBatch.java  |   6 +
 .../physical/impl/flatten/FlattenTemplate.java     |  11 ++
 .../exec/physical/impl/join/HashJoinBatch.java     |   6 +
 .../physical/impl/join/HashJoinProbeTemplate.java  |  13 ++
 .../exec/physical/impl/join/LateralJoinBatch.java  |   8 ++
 .../exec/physical/impl/join/MergeJoinBatch.java    |   7 +
 .../physical/impl/join/NestedLoopJoinBatch.java    |   8 ++
 .../exec/physical/impl/limit/LimitRecordBatch.java |  74 +++++-----
 .../impl/limit/PartitionLimitRecordBatch.java      |  10 +-
 .../impl/mergereceiver/MergingRecordBatch.java     |   8 ++
 .../OrderedPartitionRecordBatch.java               |   6 +
 .../impl/producer/ProducerConsumerBatch.java       |   5 +
 .../physical/impl/project/ProjectRecordBatch.java  |   6 +
 .../physical/impl/project/ProjectorTemplate.java   |   6 +
 .../impl/protocol/OperatorRecordBatch.java         |  18 ++-
 .../drill/exec/physical/impl/sort/SortBatch.java   |   4 +
 .../exec/physical/impl/sort/SortTemplate.java      |   4 +
 .../impl/svremover/RemovingRecordBatch.java        |   5 +
 .../exec/physical/impl/trace/TraceRecordBatch.java |   6 +-
 .../physical/impl/union/UnionAllRecordBatch.java   |   5 +
 .../exec/physical/impl/unnest/UnnestImpl.java      |  10 ++
 .../physical/impl/unnest/UnnestRecordBatch.java    |   7 +-
 .../unorderedreceiver/UnorderedReceiverBatch.java  |  36 +++--
 .../validate/IteratorValidatorBatchIterator.java   |  13 +-
 .../physical/impl/window/FrameSupportTemplate.java |  11 ++
 .../impl/window/NoFrameSupportTemplate.java        |  11 ++
 .../exec/physical/impl/window/WindowDataBatch.java |   5 +
 .../impl/window/WindowFrameRecordBatch.java        |   7 +
 .../physical/impl/xsort/ExternalSortBatch.java     |   4 +
 .../exec/physical/impl/xsort/MSortTemplate.java    |   9 ++
 .../impl/xsort/SingleBatchSorterTemplate.java      |   5 +
 .../impl/xsort/managed/ExternalSortBatch.java      |   7 +
 .../physical/impl/xsort/managed/SortConfig.java    |   9 ++
 .../exec/physical/impl/xsort/managed/SortImpl.java |   8 ++
 .../drill/exec/record/AbstractRecordBatch.java     |  50 +++++--
 .../record/AbstractTableFunctionRecordBatch.java   |   1 -
 .../org/apache/drill/exec/record/RecordBatch.java  |  15 ++
 .../drill/exec/record/RecordBatchLoader.java       |   8 ++
 .../apache/drill/exec/record/RecordIterator.java   |  14 ++
 .../apache/drill/exec/record/SchemalessBatch.java  |  10 ++
 .../drill/exec/record/SimpleRecordBatch.java       |  13 ++
 .../exec/record/selection/SelectionVector4.java    |   9 ++
 .../drill/exec/store/AbstractRecordReader.java     |   1 -
 .../org/apache/drill/exec/store/RecordReader.java  |   4 +-
 .../apache/drill/exec/store/StorageStrategy.java   |   7 +-
 .../drill/exec/store/avro/AvroRecordReader.java    |  15 +-
 .../drill/exec/store/bson/BsonRecordReader.java    |  25 ++--
 .../drill/exec/store/dfs/easy/EasyWriter.java      |   8 ++
 .../exec/store/easy/json/JSONRecordReader.java     |  46 +++---
 .../drill/exec/store/easy/json/JsonProcessor.java  |  16 +--
 .../store/easy/json/reader/BaseJsonProcessor.java  |  48 ++++---
 .../store/easy/json/reader/CountingJsonReader.java |  14 +-
 .../sequencefile/SequenceFileRecordReader.java     |  17 ++-
 .../text/compliant/CompliantTextRecordReader.java  |   9 +-
 .../exec/store/easy/text/compliant/TextReader.java |   8 ++
 .../exec/store/httpd/HttpdLogFormatPlugin.java     |   8 ++
 .../drill/exec/store/image/ImageRecordReader.java  |   7 +-
 .../drill/exec/store/log/LogRecordReader.java      |   9 +-
 .../drill/exec/store/parquet/ParquetWriter.java    |   7 +
 .../parquet/columnreaders/ParquetRecordReader.java |  10 ++
 .../exec/store/parquet2/DrillParquetReader.java    |   5 +
 .../drill/exec/store/pcap/PcapRecordReader.java    |   5 +
 .../exec/store/text/DrillTextRecordReader.java     |   9 ++
 .../drill/exec/vector/complex/fn/JsonReader.java   |  62 +++-----
 .../drill/exec/work/fragment/FragmentExecutor.java |   4 +
 .../parquet/hadoop/ColumnChunkIncReadStore.java    |   5 +
 .../java/org/apache/drill/TestOperatorDump.java    | 159 +++++++++++++++++++++
 .../drill/exec/physical/impl/MockRecordBatch.java  |   9 ++
 .../drill/exec/physical/impl/SimpleRootExec.java   |   5 +
 .../physical/impl/unnest/MockLateralJoinBatch.java |   9 ++
 .../drill/exec/work/filter/BloomFilterTest.java    |   9 ++
 .../java/org/apache/drill/test/LogFixture.java     |  12 +-
 103 files changed, 1165 insertions(+), 211 deletions(-)

diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
index 56cb935..3a3ca5a 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
@@ -771,8 +771,10 @@ public class UserException extends DrillRuntimeException {
    * @return generated user error message
    */
   private String generateMessage(boolean includeErrorIdAndIdentity) {
+    boolean seeLogsMessage = errorType == DrillPBError.ErrorType.INTERNAL_ERROR
+        || errorType == DrillPBError.ErrorType.SYSTEM;
     return errorType + " ERROR: " + super.getMessage() + "\n\n" +
-        context.generateContextMessage(includeErrorIdAndIdentity);
+        context.generateContextMessage(includeErrorIdAndIdentity, seeLogsMessage);
   }
 
 }
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java b/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java
index fa2c437..271462a 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java
@@ -29,6 +29,8 @@ import org.apache.drill.exec.proto.CoordinationProtos;
  */
 class UserExceptionContext {
 
+  private static final String NEW_LINE = System.lineSeparator();
+
   private final String errorId;
   private final List<String> contextList;
 
@@ -133,17 +135,26 @@ class UserExceptionContext {
    * generate a context message
    * @return string containing all context information concatenated
    */
-  String generateContextMessage(boolean includeErrorIdAndIdentity) {
+  String generateContextMessage(boolean includeErrorIdAndIdentity, boolean includeSeeLogsMessage) {
     StringBuilder sb = new StringBuilder();
 
     for (String context : contextList) {
-      sb.append(context).append("\n");
+      sb.append(context)
+          .append(NEW_LINE);
+    }
+
+    if (includeSeeLogsMessage) {
+      sb.append(NEW_LINE)
+          .append("Please, refer to logs for more information.")
+          .append(NEW_LINE);
     }
 
     if (includeErrorIdAndIdentity) {
       // add identification infos
-      sb.append("\n[Error Id: ");
-      sb.append(errorId).append(" ");
+      sb.append(NEW_LINE)
+          .append("[Error Id: ")
+          .append(errorId)
+          .append(" ");
       if (endpoint != null) {
         sb.append("on ")
             .append(endpoint.getAddress())
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
index 3c7ca8e..b68f574 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -81,6 +81,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
   private final String tableName;
   private OperatorContext operatorContext;
   private VectorContainerWriter vectorWriter;
+  private DBDocumentReaderBase reader;
 
   private DrillBuf buffer;
 
@@ -195,7 +196,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
     vectorWriter.reset();
 
     int recordCount = 0;
-    DBDocumentReaderBase reader = null;
+    reader = null;
 
     while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION) {
       vectorWriter.setPosition(recordCount);
@@ -526,4 +527,18 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
       table.close();
     }
   }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("MaprDBJsonRecordReader[Table=")
+        .append(table.getPath());
+    if (reader != null) {
+      sb.append(", Document ID=")
+          .append(IdCodec.asString(reader.getId()));
+    }
+    sb.append(", reader=")
+        .append(reader)
+        .append(']');
+    return sb.toString();
+  }
 }
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 86038c4..2db1d02 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -338,4 +338,9 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
     return rowCount < TARGET_RECORD_COUNT &&
         operatorContext.getAllocator().getAllocatedMemory() < MAX_ALLOCATED_MEMORY_PER_BATCH;
   }
+
+  @Override
+  public String toString() {
+    return "HBaseRecordReader[Table=" + hbaseTableName.getNamespaceAsString() + "]";
+  }
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
index 354a61e..ba1cd30 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
@@ -430,4 +430,19 @@ public abstract class HiveAbstractReader extends AbstractRecordReader {
     }
   }
 
+  @Override
+  public String toString() {
+    long position = -1;
+    try {
+      if (reader != null) {
+        position = reader.getPos();
+      }
+    } catch (IOException e) {
+      logger.trace("Unable to obtain reader position: " + e.getMessage());
+    }
+    return getClass().getSimpleName() + "[Database=" + table.getDbName()
+        + ", Table=" + table.getTableName()
+        + ", Position=" + position
+        + "]";
+  }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
index 1b6e211..cd732a6 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
@@ -279,6 +279,13 @@ class JdbcRecordReader extends AbstractRecordReader {
     AutoCloseables.close(resultSet, statement, connection);
   }
 
+  @Override
+  public String toString() {
+    return "JdbcRecordReader[sql=" + sql
+        + ", Plugin=" + storagePluginName
+        + "]";
+  }
+
   private abstract class Copier<T extends ValueVector.Mutator> {
     protected final int columnIndex;
     protected final ResultSet result;
@@ -478,5 +485,4 @@ class JdbcRecordReader extends AbstractRecordReader {
     }
 
   }
-
 }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
index d715ada..9559c3d 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
@@ -60,6 +60,7 @@ public class KafkaRecordReader extends AbstractRecordReader {
   private final boolean enableAllTextMode;
   private final boolean readNumbersAsDouble;
   private final String kafkaMsgReader;
+  private int currentMessageCount;
 
   public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, List<SchemaPath> projectedColumns,
       FragmentContext context, KafkaStoragePlugin plugin) {
@@ -105,27 +106,27 @@ public class KafkaRecordReader extends AbstractRecordReader {
     writer.allocate();
     writer.reset();
     Stopwatch watch = Stopwatch.createStarted();
-    int messageCount = 0;
+    currentMessageCount = 0;
 
     try {
       while (currentOffset < subScanSpec.getEndOffset() - 1 && msgItr.hasNext()) {
         ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
         currentOffset = consumerRecord.offset();
-        writer.setPosition(messageCount);
+        writer.setPosition(currentMessageCount);
         messageReader.readMessage(consumerRecord);
-        if (++messageCount >= DEFAULT_MESSAGES_PER_BATCH) {
+        if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
           break;
         }
       }
 
       messageReader.ensureAtLeastOneField();
-      writer.setValueCount(messageCount);
-      logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), messageCount);
+      writer.setValueCount(currentMessageCount);
+      logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), currentMessageCount);
       logger.debug("Last offset consumed for {}:{} is {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
           currentOffset);
-      return messageCount;
+      return currentMessageCount;
     } catch (Exception e) {
-      String msg = "Failure while reading messages from kafka. Recordreader was at record: " + (messageCount + 1);
+      String msg = "Failure while reading messages from kafka. Recordreader was at record: " + (currentMessageCount + 1);
       throw UserException.dataReadError(e).message(msg).addContext(e.getMessage()).build(logger);
     }
   }
@@ -139,4 +140,15 @@ public class KafkaRecordReader extends AbstractRecordReader {
     messageReader.close();
   }
 
+  @Override
+  public String toString() {
+    return "KafkaRecordReader[messageReader=" + messageReader
+        + ", kafkaPollTimeOut=" + kafkaPollTimeOut
+        + ", currentOffset=" + currentOffset
+        + ", enableAllTextMode=" + enableAllTextMode
+        + ", readNumbersAsDouble=" + readNumbersAsDouble
+        + ", kafkaMsgReader=" + kafkaMsgReader
+        + ", currentMessageCount=" + currentMessageCount
+        + "]";
+  }
 }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
index a62357d..40e9e12 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
@@ -105,4 +105,8 @@ public class JsonMessageReader implements MessageReader {
     }
   }
 
+  @Override
+  public String toString() {
+    return "JsonMessageReader[jsonReader=" + jsonReader + "]";
+  }
 }
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
index 845738c..976b16d 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
@@ -78,6 +78,9 @@ public class KuduRecordReader extends AbstractRecordReader {
   private OutputMutator output;
   private OperatorContext context;
 
+  private String lastColumnName;
+  private Type lastColumnType;
+
   private static class ProjectedColumnInfo {
     int index;
     ValueVector vv;
@@ -176,6 +179,8 @@ public class KuduRecordReader extends AbstractRecordReader {
 
       final String name = col.getName();
       final Type kuduType = col.getType();
+      lastColumnName = name;
+      lastColumnType = kuduType;
       MinorType minorType = TYPES.get(kuduType);
       if (minorType == null) {
         logger.warn("Ignoring column that is unsupported.", UserException
@@ -326,4 +331,10 @@ public class KuduRecordReader extends AbstractRecordReader {
   public void close() {
   }
 
+  @Override
+  public String toString() {
+    return "KuduRecordReader[Column=" + lastColumnName
+        + ", Type=" + lastColumnType
+        + "]";
+  }
 }
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
index d0fa158..7611576 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
@@ -77,4 +77,9 @@ public class KuduWriter extends AbstractWriter {
   public KuduStoragePlugin getPlugin() {
     return plugin;
   }
+
+  @Override
+  public String toString() {
+    return "KuduWriter[name=" + name + ", storageStrategy=" + getStorageStrategy() + "]";
+  }
 }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index a79e39a..f5d1f2e 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -216,4 +216,9 @@ public class MongoRecordReader extends AbstractRecordReader {
   public void close() {
   }
 
+  @Override
+  public String toString() {
+    Object reader = isBsonRecordReader ? bsonReader : jsonReader;
+    return "MongoRecordReader[reader=" + reader + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
index da988de..521aba1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
@@ -106,4 +106,12 @@ public class HashAggregate extends AbstractSingle {
     return queryContext == null ||
       1 < (int) queryContext.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
   }
+
+  @Override
+  public String toString() {
+    return "HashAggregate[groupByExprs=" + groupByExprs
+        + ", aggrExprs=" + aggrExprs
+        + ", cardinality=" + cardinality
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
index d28d563..320bc6d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
@@ -106,4 +106,15 @@ public class OrderedPartitionSender extends AbstractSender {
   public int getOperatorType() {
     return CoreOperatorType.ORDERED_PARTITION_SENDER_VALUE;
   }
+
+  @Override
+  public String toString() {
+    return "OrderedPartitionSender[orderings=" + orderings
+        + ", ref=" + ref
+        + ", sendingWidth=" + sendingWidth
+        + ", recordsToSample=" + recordsToSample
+        + ", samplingFactor=" + samplingFactor
+        + ", completionFactor=" + completionFactor
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
index 29f8bb2..4ea710d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
@@ -59,4 +59,9 @@ public class PartitionLimit extends Limit {
   public int getOperatorType() {
     return CoreOperatorType.PARTITION_LIMIT_VALUE;
   }
+
+  @Override
+  public String toString() {
+    return "PartitionLimit[partitionColumn=" + partitionColumn + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
index 85ef7da..5d65c39 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
@@ -71,4 +71,11 @@ public class Sort extends AbstractSingle{
   public int getOperatorType() {
     return CoreOperatorType.OLD_SORT_VALUE;
   }
+
+  @Override
+  public String toString() {
+    return "Sort[orderings=" + orderings
+        + ", reverse=" + reverse
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java
index 54d1b4d..d2a87d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java
@@ -69,4 +69,11 @@ public class TopN extends Sort {
     return CoreOperatorType.TOP_N_SORT_VALUE;
   }
 
+  @Override
+  public String toString() {
+    return "TopN[orderings=" + orderings
+        + ", reverse=" + reverse
+        + ", limit=" + limit
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java
index 543c09f..3ddaa7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java
@@ -95,6 +95,17 @@ public class WindowPOP extends AbstractSingle {
     return frameUnitsRows;
   }
 
+  @Override
+  public String toString() {
+    return "WindowPOP[withins=" + withins
+        + ", aggregations=" + aggregations
+        + ", orderings=" + orderings
+        + ", frameUnitsRows=" + frameUnitsRows
+        + ", start=" + start
+        + ", end=" + end
+        + "]";
+  }
+
   @JsonTypeName("windowBound")
   public static class Bound {
     private final boolean unbounded;
@@ -117,6 +128,11 @@ public class WindowPOP extends AbstractSingle {
     public long getOffset() {
       return offset;
     }
+
+    @Override
+    public String toString() {
+      return "Bound[unbounded=" + unbounded + ", offset=" + offset + "]";
+    }
   }
 
   public static Bound newBound(RexWindowBound windowBound) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index e148278..9142a2a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl;
 
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.drill.common.DeferredException;
@@ -124,6 +125,28 @@ public abstract class BaseRootExec implements RootExec {
   }
 
   @Override
+  public void dumpBatches() {
+    final int numberOfBatchesToDump = 2;
+    logger.error("Batch dump started: dumping last {} failed batches", numberOfBatchesToDump);
+    // As batches are stored in a 'flat' List there is a need to filter out the failed batch
+    // and a few of its parent (actual number of batches is set by a constant defined above)
+    List<CloseableRecordBatch> failedBatchStack = new LinkedList<>();
+    for (int i = operators.size() - 1; i >= 0; i--) {
+      CloseableRecordBatch batch = operators.get(i);
+      if (batch.hasFailed()) {
+        failedBatchStack.add(0, batch);
+        if (failedBatchStack.size() == numberOfBatchesToDump) {
+          break;
+        }
+      }
+    }
+    for (CloseableRecordBatch batch : failedBatchStack) {
+      batch.dump();
+    }
+    logger.error("Batch dump completed.");
+  }
+
+  @Override
   public void close() throws Exception {
     // We want to account for the time spent waiting here as Wait time in the operator profile
     try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
index df0f89b..34f2131 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
@@ -44,4 +44,10 @@ public interface RootExec extends AutoCloseable {
    * @param handle The handle pointing to the downstream receiver that does not need anymore data.
    */
   void receivingFragmentFinished(FragmentHandle handle);
+
+  /**
+   * Dump failed batches' state preceded by its parent's state to logs. Invoked when there is a
+   * failure during fragment execution.
+   */
+  void dumpBatches();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index cd24a4c..dc8dd0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -83,6 +83,10 @@ public class ScanBatch implements CloseableRecordBatch {
   private final List<Map<String, String>> implicitColumnList;
   private String currentReaderClassName;
   private final RecordBatchStatsContext batchStatsContext;
+  // Represents last outcome of next(). If an Exception is thrown
+  // during the method's execution a value IterOutcome.STOP will be assigned.
+  private IterOutcome lastOutcome;
+
   /**
    *
    * @param context
@@ -160,7 +164,8 @@ public class ScanBatch implements CloseableRecordBatch {
   @Override
   public IterOutcome next() {
     if (done) {
-      return IterOutcome.NONE;
+      lastOutcome = IterOutcome.NONE;
+      return lastOutcome;
     }
     oContext.getStats().startProcessing();
     try {
@@ -168,7 +173,8 @@ public class ScanBatch implements CloseableRecordBatch {
         if (currentReader == null && !getNextReaderIfHas()) {
           releaseAssets(); // All data has been read. Release resource.
           done = true;
-          return IterOutcome.NONE;
+          lastOutcome = IterOutcome.NONE;
+          return lastOutcome;
         }
         injector.injectChecked(context.getExecutionControls(), "next-allocate", OutOfMemoryException.class);
         currentReader.allocate(mutator.fieldVectorMap());
@@ -191,7 +197,8 @@ public class ScanBatch implements CloseableRecordBatch {
           // This could happen when data sources have a non-trivial schema with 0 row.
           container.buildSchema(SelectionVectorMode.NONE);
           schema = container.getSchema();
-          return IterOutcome.OK_NEW_SCHEMA;
+          lastOutcome = IterOutcome.OK_NEW_SCHEMA;
+          return lastOutcome;
         }
 
         // Handle case of same schema.
@@ -199,11 +206,13 @@ public class ScanBatch implements CloseableRecordBatch {
             continue; // Skip to next loop iteration if reader returns 0 row and has same schema.
         } else {
           // return OK if recordCount > 0 && ! isNewSchema
-          return IterOutcome.OK;
+          lastOutcome = IterOutcome.OK;
+          return lastOutcome;
         }
       }
     } catch (OutOfMemoryException ex) {
       clearFieldVectorMap();
+      lastOutcome = IterOutcome.STOP;
       throw UserException.memoryError(ex).build(logger);
     } catch (ExecutionSetupException e) {
       if (currentReader != null) {
@@ -213,12 +222,15 @@ public class ScanBatch implements CloseableRecordBatch {
           logger.error("Close failed for reader " + currentReaderClassName, e2);
         }
       }
+      lastOutcome = IterOutcome.STOP;
       throw UserException.internalError(e)
           .addContext("Setup failed for", currentReaderClassName)
           .build(logger);
     } catch (UserException ex) {
+      lastOutcome = IterOutcome.STOP;
       throw ex;
     } catch (Exception ex) {
+      lastOutcome = IterOutcome.STOP;
       throw UserException.internalError(ex).build(logger);
     } finally {
       oContext.getStats().stopProcessing();
@@ -559,4 +571,14 @@ public class ScanBatch implements CloseableRecordBatch {
 
     return true;
   }
+
+  @Override
+  public boolean hasFailed() {
+    return lastOutcome == IterOutcome.STOP;
+  }
+
+  @Override
+  public void dump() {
+    logger.error("ScanBatch[container={}, currentReader={}, schema={}]", container, currentReader, schema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index ba4b94a..22dfdf0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -74,6 +74,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
 
 /**
  * Operator Batch which implements the TopN functionality. It is more efficient than (sort + limit) since unlike sort
@@ -185,7 +186,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     // Reset the TopN state for next iteration
     resetTopNState();
 
-    try{
+    try {
       boolean incomingHasSv2 = false;
       switch (incoming.getSchema().getSelectionVectorMode()) {
         case NONE: {
@@ -693,4 +694,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
       return sv4;
     }
   }
+
+  @Override
+  public void dump() {
+    logger.error("TopNBatch[container={}, config={}, schema={}, sv4={}, countSincePurge={}, " +
+        "batchCount={}, recordCount={}]", container, config, schema, sv4, countSincePurge, batchCount, recordCount);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 65d0c54..3a8485a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -209,4 +209,10 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
     closeWriter();
     super.close();
   }
+
+  @Override
+  public void dump() {
+    logger.error("WriterRecordBatch[container={}, popConfig={}, counter={}, fragmentUniqueId={}, schema={}]",
+        container, popConfig, counter, fragmentUniqueId, schema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 9de9aae..80d25ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.aggregate;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -500,4 +501,12 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     wasKilled = true;
     incoming.kill(sendUpstream);
   }
+
+  @Override
+  public void dump() {
+    logger.error("HashAggBatch[container={}, aggregator={}, groupByOutFieldIds={}, aggrOutFieldIds={}, " +
+            "incomingSchema={}, wasKilled={}, numGroupByExprs={}, numAggrExprs={}, popConfig={}]",
+        container, aggregator, Arrays.toString(groupByOutFieldIds), Arrays.toString(aggrOutFieldIds), incomingSchema,
+        wasKilled, numGroupByExprs, numAggrExprs, popConfig);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index e8ae30e..f6dd3da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 
 import javax.inject.Named;
 
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.drill.common.exceptions.RetryAfterSpillException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ExpressionPosition;
@@ -1603,6 +1604,14 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
   }
 
+  @Override
+  public String toString() {
+    // The fields are excluded because they are passed from HashAggBatch
+    String[] excludedFields = new String[] {
+        "baseHashTable", "incoming", "outgoing", "context", "oContext", "allocator", "htables", "newIncoming"};
+    return ReflectionToStringBuilder.toStringExclude(this, excludedFields);
+  }
+
   // Code-generated methods (implemented in HashAggBatch)
   public abstract void doSetup(@Named("incoming") RecordBatch incoming) throws SchemaChangeException;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
index 7ebce2b..822a810 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.SimpleRecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -40,6 +41,9 @@ import java.util.Iterator;
  * A class to replace "incoming" - instead scanning a spilled partition file
  */
 public class SpilledRecordbatch implements CloseableRecordBatch {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleRecordBatch.class);
+
   private VectorContainer container;
   private InputStream spillStream;
   private int spilledBatches;
@@ -49,6 +53,9 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
   private String spillFile;
   VectorAccessibleSerializable vas;
   private IterOutcome initialOutcome;
+  // Represents last outcome of next(). If an Exception is thrown
+  // during the method's execution a value IterOutcome.STOP will be assigned.
+  private IterOutcome lastOutcome;
 
   public SpilledRecordbatch(String spillFile, int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) {
     this.context = context;
@@ -66,6 +73,7 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
     }
 
     initialOutcome = next(); // initialize the container
+    lastOutcome = initialOutcome;
   }
 
   @Override
@@ -126,14 +134,19 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
   @Override
   public IterOutcome next() {
 
-    if ( ! context.getExecutorState().shouldContinue() ) { return IterOutcome.STOP; }
+    if (!context.getExecutorState().shouldContinue()) {
+      lastOutcome = IterOutcome.STOP;
+      return lastOutcome;
+    }
 
     if ( spilledBatches <= 0 ) { // no more batches to read in this partition
       this.close();
-      return IterOutcome.NONE;
+      lastOutcome = IterOutcome.NONE;
+      return lastOutcome;
     }
 
     if ( spillStream == null ) {
+      lastOutcome = IterOutcome.STOP;
       throw new IllegalStateException("Spill stream was null");
     }
 
@@ -152,11 +165,16 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
         container = vas.get();
       }
     } catch (IOException e) {
+      lastOutcome = IterOutcome.STOP;
       throw UserException.dataReadError(e).addContext("Failed reading from a spill file").build(HashAggTemplate.logger);
+    } catch (Exception e) {
+      lastOutcome = IterOutcome.STOP;
+      throw e;
     }
 
     spilledBatches--; // one less batch to read
-    return IterOutcome.OK;
+    lastOutcome = IterOutcome.OK;
+    return lastOutcome;
   }
 
   /**
@@ -164,6 +182,17 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
    */
   public IterOutcome getInitialOutcome() { return initialOutcome; }
 
+  @Override
+  public void dump() {
+    logger.error("SpilledRecordbatch[container={}, spilledBatches={}, schema={}, spillFile={}, spillSet={}]",
+        container, spilledBatches, schema, spillFile, spillSet);
+  }
+
+  @Override
+  public boolean hasFailed() {
+    return lastOutcome == IterOutcome.STOP;
+  }
+
   /**
    * Note: ignoring any IO errors (e.g. file not found)
    */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index c42f9bf..2b9b317 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -648,4 +648,10 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   protected void killIncoming(boolean sendUpstream) {
     incoming.kill(sendUpstream);
   }
+
+  @Override
+  public void dump() {
+    logger.error("StreamingAggBatch[container={}, popConfig={}, aggregator={}, incomingSchema={}]",
+        container, popConfig, aggregator, incomingSchema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index f30616b..4bde7ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -498,6 +498,16 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
   public void cleanup() {
   }
 
+  @Override
+  public String toString() {
+    return "StreamingAggTemplate[underlyingIndex=" + underlyingIndex
+        + ", previousIndex=" + previousIndex
+        + ", currentIndex=" + currentIndex
+        + ", addedRecordCount=" + addedRecordCount
+        + ", outputCount=" + outputCount
+        + "]";
+  }
+
   public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
   public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2);
   public abstract boolean isSamePrev(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int b2Index);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index b8d4e76..179e6c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -43,6 +43,9 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter> {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
+
   private SelectionVector2 sv2;
   private SelectionVector4 sv4;
   private Filterer filter;
@@ -196,4 +199,9 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter> {
       throw new SchemaChangeException("Failure while attempting to load generated class", e);
     }
   }
+
+  @Override
+  public void dump() {
+    logger.error("FilterRecordBatch[container={}, selectionVector2={}, filter={}, popConfig={}]", container, sv2, filter, popConfig);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
index 7b0183b..483a9ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
@@ -119,4 +119,11 @@ public abstract class FilterTemplate2 implements Filterer {
                                  @Named("outIndex") int outIndex)
                           throws SchemaChangeException;
 
+  @Override
+  public String toString() {
+    return "FilterTemplate2[outgoingSelectionVector=" + outgoingSelectionVector
+        + ", incomingSelectionVector=" + incomingSelectionVector
+        + ", svMode=" + svMode
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
index e09ed75..d85d6f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
@@ -64,4 +64,10 @@ public abstract class FilterTemplate4 implements Filterer {
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
   public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
 
+  @Override
+  public String toString() {
+    return "FilterTemplate4[outgoingSelectionVector=" + outgoingSelectionVector
+        + ", incomingSelectionVector=" + incomingSelectionVector
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index 7faaaa5..bc21580 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
@@ -247,4 +247,11 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
       }
     }
   }
-}
\ No newline at end of file
+
+  @Override
+  public void dump() {
+    logger.error("RuntimeFilterRecordBatch[container={}, selectionVector={}, toFilterFields={}, "
+        + "originalRecordCount={}, batchSchema={}]",
+        container, sv2, toFilterFields, originalRecordCount, incoming.getSchema());
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 1623319..86ddcd1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -532,4 +532,10 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
     updateStats();
     super.close();
   }
+
+  @Override
+  public void dump() {
+    logger.error("FlattenRecordbatch[hasRemainder={}, remainderIndex={}, recordCount={}, flattener={}, container={}]",
+        hasRemainder, remainderIndex, recordCount, flattener, container);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
index e59abac..fe38244 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
@@ -175,4 +175,15 @@ public abstract class FlattenTemplate implements Flattener {
                                @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
   public abstract boolean doEval(@Named("inIndex") int inIndex,
                                  @Named("outIndex") int outIndex) throws SchemaChangeException;
+
+  @Override
+  public String toString() {
+    return "FlattenTemplate[svMode=" + svMode
+        + ", fieldToFlatten=" + fieldToFlatten
+        + ", valueIndex=" + valueIndex
+        + ", outputLimit=" + outputLimit
+        + ", innerValueIndex=" + innerValueIndex
+        + ", currentInnerValueIndex=" + currentInnerValueIndex
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 368bb5d..89ab8d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -1276,4 +1276,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     return hj;
   }
 
+  @Override
+  public void dump() {
+    logger.error("HashJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, hashJoinProbe={}," +
+            " rightExpr={}, canSpill={}, buildSchema={}, probeSchema={}]", container, left, right, leftUpstream, rightUpstream,
+        joinType, hashJoinProbe, rightExpr, canSpill, buildSchema, probeSchema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 639f757..71abeda 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -432,4 +432,17 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
       (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) ? ProbeState.PROJECT_RIGHT :
         ProbeState.DONE; // else we're done
   }
+
+  @Override
+  public String toString() {
+    return "HashJoinProbeTemplate[container=" + container
+        + ", probeSchema=" + probeSchema
+        + ", joinType=" + joinType
+        + ", recordsToProcess=" + recordsToProcess
+        + ", recordsProcessed=" + recordsProcessed
+        + ", outputRecords=" + outputRecords
+        + ", probeState=" + probeState
+        + ", unmatchedBuildIndexes=" + unmatchedBuildIndexes
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 1aaf5e2..242687f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -1205,4 +1205,12 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
       }
     }
   }
+
+  @Override
+  public void dump() {
+    logger.error("LateralJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, leftSchema={}, " +
+            "rightSchema={}, outputIndex={}, leftJoinIndex={}, rightJoinIndex={}, hasRemainderForLeftJoin={}]",
+        container, left, right, leftUpstream, rightUpstream, leftSchema, rightSchema, outputIndex,
+        leftJoinIndex, rightJoinIndex, hasRemainderForLeftJoin);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 72f776a..d502c4f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -547,4 +547,11 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
     }
     return materializedExpr;
   }
+
+  @Override
+  public void dump() {
+    logger.error("MergeJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, leftIterator={}," +
+            " rightIterator={}, joinStatus={}, joinType={}]",
+        container, left, right, leftUpstream, rightUpstream, joinType, leftIterator, rightIterator, status, joinType);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index e2f93ec..6b0c749 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -459,4 +459,12 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
   public int getRecordCount() {
     return outputRecords;
   }
+
+  @Override
+  public void dump() {
+    logger.error("NestedLoopJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, "
+            + "leftSchema={}, rightSchema={}, outputRecords={}, rightContainer={}, rightCounts={}]",
+        container, left, right, leftUpstream, rightUpstream,
+        leftSchema, rightSchema, outputRecords, rightContainer, rightCounts);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 784d955..bb49187 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -37,7 +37,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 
 public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
-  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
 
   private SelectionVector2 outgoingSv;
   private SelectionVector2 incomingSv;
@@ -58,46 +58,46 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   @Override
   public IterOutcome innerNext() {
     if (!first && !needMoreRecords(numberOfRecords)) {
-        outgoingSv.setRecordCount(0);
-        incoming.kill(true);
+      outgoingSv.setRecordCount(0);
+      incoming.kill(true);
 
-        IterOutcome upStream = next(incoming);
+      IterOutcome upStream = next(incoming);
+      if (upStream == IterOutcome.OUT_OF_MEMORY) {
+        return upStream;
+      }
+
+      while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
+        // Clear the memory for the incoming batch
+        for (VectorWrapper<?> wrapper : incoming) {
+          wrapper.getValueVector().clear();
+        }
+        // clear memory for incoming sv (if any)
+        if (incomingSv != null) {
+          incomingSv.clear();
+        }
+        upStream = next(incoming);
         if (upStream == IterOutcome.OUT_OF_MEMORY) {
           return upStream;
         }
-
-        while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
-          // Clear the memory for the incoming batch
-          for (VectorWrapper<?> wrapper : incoming) {
-            wrapper.getValueVector().clear();
-          }
-          // clear memory for incoming sv (if any)
-          if (incomingSv != null) {
-            incomingSv.clear();
-          }
-          upStream = next(incoming);
-          if (upStream == IterOutcome.OUT_OF_MEMORY) {
-            return upStream;
-          }
+      }
+      // If EMIT that means leaf operator is UNNEST, in this case refresh the limit states and return EMIT.
+      if (upStream == EMIT) {
+        // Clear the memory for the incoming batch
+        for (VectorWrapper<?> wrapper : incoming) {
+          wrapper.getValueVector().clear();
         }
-        // If EMIT that means leaf operator is UNNEST, in this case refresh the limit states and return EMIT.
-        if (upStream == EMIT) {
-          // Clear the memory for the incoming batch
-          for (VectorWrapper<?> wrapper : incoming) {
-            wrapper.getValueVector().clear();
-          }
-
-          // clear memory for incoming sv (if any)
-          if (incomingSv != null) {
-            incomingSv.clear();
-          }
-
-          refreshLimitState();
-          return upStream;
+
+        // clear memory for incoming sv (if any)
+        if (incomingSv != null) {
+          incomingSv.clear();
         }
-        // other leaf operator behave as before.
-        return NONE;
+
+        refreshLimitState();
+        return upStream;
       }
+      // other leaf operator behave as before.
+      return NONE;
+    }
     return super.innerNext();
   }
 
@@ -266,4 +266,10 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
       Integer.MIN_VALUE : Math.max(0, popConfig.getLast()) - recordStartOffset;
     first = true;
   }
+
+  @Override
+  public void dump() {
+    logger.error("LimitRecordBatch[container={}, offset={}, numberOfRecords={}, incomingSV={}, outgoingSV={}]",
+        container, recordStartOffset, numberOfRecords, incomingSv, outgoingSv);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
index fe1660f..ed7b265 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
@@ -40,7 +40,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
  * implicit column for rowId for each row.
  */
 public class PartitionLimitRecordBatch extends AbstractSingleRecordBatch<PartitionLimit> {
-  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
 
   private SelectionVector2 outgoingSv;
   private SelectionVector2 incomingSv;
@@ -250,4 +250,12 @@ public class PartitionLimitRecordBatch extends AbstractSingleRecordBatch<Partiti
     numberOfRecords = (popConfig.getLast() == null) ?
       Integer.MIN_VALUE : Math.max(0, popConfig.getLast()) - recordStartOffset;
   }
+
+  @Override
+  public void dump() {
+    logger.error("PartitionLimitRecordBatch[container={}, popConfig={}, incomingSV={}, outgoingSV={},"
+            + " recordStartOffset={}, numberOfRecords={}, partitionId={}, unionTypeEnabled={}, state={}]",
+        container, popConfig, incomingSv, outgoingSv, recordStartOffset, numberOfRecords,
+        partitionId, unionTypeEnabled, state);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 791b24a..12ee668 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.mergereceiver;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
@@ -827,4 +828,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     super.close();
   }
 
+  @Override
+  public void dump() {
+    logger.error("MergingRecordBatch[container={}, outgoingPosition={}, incomingBatches={}, batchOffsets={}, "
+            + "tempBatchHolder={}, inputCounts={}, outputCounts={}]",
+        container, outgoingPosition, Arrays.toString(incomingBatches), Arrays.toString(batchOffsets),
+        Arrays.toString(tempBatchHolder), Arrays.toString(inputCounts), Arrays.toString(outputCounts));
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index a3aa11b..63a0121 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -646,4 +646,10 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
     }
   }
 
+  @Override
+  public void dump() {
+    logger.error("OrderedPartitionRecordBatch[container={}, popConfig={}, partitionVectors={}, partitions={}, " +
+            "recordsSampled={}, recordCount={}]",
+        container, popConfig, partitionVectors, partitions, recordsSampled, recordCount);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index bbcb758..9385400 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -247,4 +247,9 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
     }
   }
 
+  @Override
+  public void dump() {
+    logger.error("ProducerConsumerBatch[container={}, recordCount={}, schema={}, stop={}]",
+        container, recordCount, schema, stop);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 4d55f00..8ea15d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -898,4 +898,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     wasNone = true;
     return IterOutcome.OK_NEW_SCHEMA;
   }
+
+  @Override
+  public void dump() {
+    logger.error("ProjectRecordBatch[projector={}, hasRemainder={}, remainderIndex={}, recordCount={}, container={}]",
+        projector, hasRemainder, remainderIndex, recordCount, container);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index 02ccd4b..2f1aa02 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -110,4 +110,10 @@ public abstract class ProjectorTemplate implements Projector {
                               @Named("outIndex") int outIndex)
                        throws SchemaChangeException;
 
+  @Override
+  public String toString() {
+    return "Projector[vector2=" + vector2
+        + ", selectionVectorMode=" + svMode
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
index 620f150..e0beab1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
@@ -55,6 +55,7 @@ public class OperatorRecordBatch implements CloseableRecordBatch {
 
   private final OperatorDriver driver;
   private final BatchAccessor batchAccessor;
+  private IterOutcome lastOutcome;
 
   public OperatorRecordBatch(FragmentContext context, PhysicalOperator config, OperatorExec opExec) {
     OperatorContext opContext = context.newOperatorContext(config);
@@ -143,7 +144,12 @@ public class OperatorRecordBatch implements CloseableRecordBatch {
   public IterOutcome next() {
     try {
       driver.operatorContext().getStats().startProcessing();
-      return driver.next();
+      lastOutcome = driver.next();
+      return lastOutcome;
+    } catch (Exception e) {
+      // mark batch as failed
+      lastOutcome = IterOutcome.STOP;
+      throw e;
     } finally {
       driver.operatorContext().getStats().stopProcessing();
     }
@@ -158,4 +164,14 @@ public class OperatorRecordBatch implements CloseableRecordBatch {
   public VectorContainer getContainer() {
     return batchAccessor.getOutgoingContainer();
   }
+
+  @Override
+  public boolean hasFailed() {
+    return lastOutcome == IterOutcome.STOP;
+  }
+
+  @Override
+  public void dump() {
+    logger.error("OperatorRecordBatch[batchAccessor={}, lastOutcome={}]", batchAccessor, lastOutcome);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index fc49d43..fcbb10e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -213,4 +213,8 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
     incoming.kill(sendUpstream);
   }
 
+  @Override
+  public void dump() {
+    logger.error("SortBatch[popConfig={}, container={}, sorter={}, schema={}]", popConfig, container, sorter, schema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
index da476cc..b3c6a7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
@@ -70,4 +70,8 @@ public abstract class SortTemplate implements Sorter, IndexedSortable{
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch outgoing);
   public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex);
 
+  @Override
+  public String toString() {
+    return "SortTemplate[vector4=" + vector4 + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 1471d5e..a8c3622 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -92,4 +92,9 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
   public WritableBatch getWritableBatch() {
     return WritableBatch.get(this);
   }
+
+  @Override
+  public void dump() {
+    logger.error("RemovingRecordBatch[container={}, state={}, copier={}]", container, state, copier);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 50cb26b..d56f848 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-/* TraceRecordBatch contains value vectors which are exactly the same
+/** TraceRecordBatch contains value vectors which are exactly the same
  * as the incoming record batch's value vectors. If the incoming
  * record batch has a selection vector (type 2) then TraceRecordBatch
  * will also contain a selection vector.
@@ -171,4 +171,8 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
     super.close();
   }
 
+  @Override
+  public void dump() {
+    logger.error("TraceRecordBatch[filename={}, logLocation={}, selectionVector={}]", getFileName(), logLocation, sv);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 7e16d6a..e83fddf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -440,4 +440,9 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
       batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
   }
 
+  @Override
+  public void dump() {
+    logger.error("UnionAllRecordBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, "
+            + "recordCount={}]", container, left, right, leftUpstream, rightUpstream, recordCount);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
index 285481f..508999f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
@@ -191,4 +191,14 @@ public class UnnestImpl implements Unnest {
       transfers = null;
     }
   }
+
+  @Override
+  public String toString() {
+    return "UnnestImpl[svMode=" + svMode
+        + ", outputLimit=" + outputLimit
+        + ", valueIndex=" + valueIndex
+        + ", innerValueIndex=" + innerValueIndex
+        + ", runningInnerValueIndex=" + runningInnerValueIndex
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 5f63967..1c8336d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -241,7 +241,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
     }
   }
 
-    @Override
+  @Override
   public VectorContainer getOutgoingContainer() {
     return this.container;
   }
@@ -446,4 +446,9 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
     super.close();
   }
 
+  @Override
+  public void dump() {
+    logger.error("UnnestRecordBatch[container={}, unnest={}, hasRemainder={}, remainderIndex={}, " +
+            "unnestFieldMetadata={}]", container, unnest, hasRemainder, remainderIndex, unnestFieldMetadata);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 8cdc0a1..a6ebef0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -64,6 +64,9 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
   private boolean first = true;
   private final UnorderedReceiver config;
   private final OperatorContext oContext;
+  // Represents last outcome of next(). If an Exception is thrown
+  // during the method's execution a value IterOutcome.STOP will be assigned.
+  private IterOutcome lastOutcome;
 
   public enum Metric implements MetricDef {
     BYTES_RECEIVED,
@@ -156,7 +159,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
   public IterOutcome next() {
     batchLoader.resetRecordCount();
     stats.startProcessing();
-    try{
+    try {
       RawFragmentBatch batch;
       try {
         stats.startWait();
@@ -174,15 +177,17 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
       first = false;
 
       if (batch == null) {
+        lastOutcome = IterOutcome.NONE;
         batchLoader.zero();
         if (!context.getExecutorState().shouldContinue()) {
-          return IterOutcome.STOP;
+          lastOutcome = IterOutcome.STOP;
         }
-        return IterOutcome.NONE;
+        return lastOutcome;
       }
 
       if (context.getAllocator().isOverLimit()) {
-        return IterOutcome.OUT_OF_MEMORY;
+        lastOutcome = IterOutcome.OUT_OF_MEMORY;
+        return lastOutcome;
       }
 
       final RecordBatchDef rbd = batch.getHeader().getDef();
@@ -195,14 +200,19 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
       if(schemaChanged) {
         this.schema = batchLoader.getSchema();
         stats.batchReceived(0, rbd.getRecordCount(), true);
-        return IterOutcome.OK_NEW_SCHEMA;
+        lastOutcome = IterOutcome.OK_NEW_SCHEMA;
       } else {
         stats.batchReceived(0, rbd.getRecordCount(), false);
-        return IterOutcome.OK;
+        lastOutcome = IterOutcome.OK;
       }
-    } catch(SchemaChangeException | IOException ex) {
+      return lastOutcome;
+    } catch (SchemaChangeException | IOException ex) {
       context.getExecutorState().fail(ex);
-      return IterOutcome.STOP;
+      lastOutcome = IterOutcome.STOP;
+      return lastOutcome;
+    } catch (Exception e) {
+      lastOutcome = IterOutcome.STOP;
+      throw e;
     } finally {
       stats.stopProcessing();
     }
@@ -270,4 +280,14 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
       }
     }
   }
+
+  @Override
+  public void dump() {
+    logger.error("UnorderedReceiverBatch[batchLoader={}, schema={}]", batchLoader, schema);
+  }
+
+  @Override
+  public boolean hasFailed() {
+    return lastOutcome == IterOutcome.STOP;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 88f4c7d..1ea3895 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -322,8 +322,7 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
       }
 
       return batchState;
-    }
-    catch (RuntimeException | Error e) {
+    } catch (RuntimeException | Error e) {
       exceptionState = e;
       logger.trace("[#{}, on {}]: incoming next() exception: ({} ->) {}",
                    instNum, batchTypeName, prevBatchState, exceptionState,
@@ -366,4 +365,14 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
 
   public RecordBatch getIncoming() { return incoming; }
 
+  @Override
+  public boolean hasFailed() {
+    return exceptionState != null || batchState == STOP;
+  }
+
+  @Override
+  public void dump() {
+    logger.error("IteratorValidatorBatchIterator[container={}, instNum={}, batchTypeName={}, lastSchema={}, "
+           + "lastNewSchema={}]", getContainer(), instNum, batchTypeName, lastSchema, lastNewSchema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
index 5cd6de7..1e477ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
@@ -295,6 +295,17 @@ public abstract class FrameSupportTemplate implements WindowFramer {
     internal.clear();
   }
 
+  @Override
+  public String toString() {
+    return "FrameSupportTemplate[internal=" + internal
+        + ", outputCount=" + outputCount
+        + ", current=" + current
+        + ", frameLastRow=" + frameLastRow
+        + ", remainingRows=" + remainingRows
+        + ", partialPartition=" + partialPartition
+        + "]";
+  }
+
   /**
    * called once for each peer row of the current frame.
    * @param index of row to aggregate
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
index 55c27c1..cc7a04d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
@@ -298,6 +298,17 @@ public abstract class NoFrameSupportTemplate implements WindowFramer {
     internal.clear();
   }
 
+  @Override
+  public String toString() {
+    return "FrameSupportTemplate[internal=" + internal
+        + ", outputCount=" + outputCount
+        + ", current=" + current
+        + ", requireFullPartition=" + requireFullPartition
+        + ", partition=" + partition
+        + "]";
+  }
+
+
   /**
    * called once for each row after we evaluate all peer rows. Used to write a value in the row
    *
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
index 7d98724..a9baea9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
@@ -106,4 +106,9 @@ public class WindowDataBatch implements VectorAccessible {
   public void clear() {
     container.clear();
   }
+
+  @Override
+  public String toString() {
+    return "WindowDataBatch[container=" + container + ", recordCount=" + recordCount + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index a372a3c..59e84ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.window;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
@@ -426,4 +427,10 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
   public int getRecordCount() {
     return framers[0].getOutputCount();
   }
+
+  @Override
+  public void dump() {
+    logger.error("WindowFrameRecordBatch[container={}, popConfig={}, framers={}, schema={}]",
+        container, popConfig, Arrays.toString(framers), schema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 0edf974..262a241 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -824,4 +824,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     incoming.kill(sendUpstream);
   }
 
+  @Override
+  public void dump() {
+    logger.error("ExternalSortBatch[schema={}, sorter={}, mSorter={}, container={}]", schema, sorter, mSorter, container);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 5ebec50..9b10d43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -206,4 +206,13 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
   public abstract int doEval(@Named("leftIndex") int leftIndex,
                              @Named("rightIndex") int rightIndex)
                       throws SchemaChangeException;
+
+  @Override
+  public String toString() {
+    return "MSortTemplate[vector4=" + vector4
+        + ", aux=" + aux
+        + ", runStarts=" + runStarts
+        + ", desiredRecordBatchCount=" + desiredRecordBatchCount
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
index de783df..57d2ec3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
@@ -83,4 +83,9 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In
   public abstract int doEval(@Named("leftIndex") char leftIndex,
                              @Named("rightIndex") char rightIndex)
                       throws SchemaChangeException;
+
+  @Override
+  public String toString() {
+    return "SinglebatchSorterTemplate[vector2=" + vector2 + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 7db4d3b..8fc4a74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -698,4 +698,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     SpilledRuns spilledRuns = new SpilledRuns(oContext, spillSet, copierHolder);
     return new SortImpl(oContext, sortConfig, spilledRuns, outputWrapperContainer);
   }
+
+  @Override
+  public void dump() {
+    logger.error("ExternalSortBatch[schema={}, sortState={}, sortConfig={}, outputWrapperContainer={}, "
+            + "outputSV4={}, container={}]",
+        schema, sortState, sortConfig, outputWrapperContainer, outputSV4, container);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
index e592ccb..fbcf1ae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
@@ -137,6 +137,15 @@ public class SortConfig {
                   mergeBatchSize, mSortBatchSize);
   }
 
+  @Override
+  public String toString() {
+    return "SortConfig[spillFileSize=" + spillFileSize
+        + ", spillBatchSize=" + spillBatchSize
+        + ", mergeBatchSize=" + mergeBatchSize
+        + ", mSortBatchSize=" + mSortBatchSize
+        + "]";
+  }
+
   public long maxMemory() { return maxMemory; }
   public int mergeLimit() { return mergeLimit; }
   public long spillFileSize() { return spillFileSize; }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
index ac30e94..79294cc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
@@ -592,4 +592,12 @@ public class SortImpl {
       throw ex;
     }
   }
+
+  @Override
+  public String toString() {
+    return "SortImpl[config=" + config
+        + ", outputBatch=" + outputBatch
+        + ", sizer=" + sizer
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index c38de2d..362ea29 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -47,6 +47,10 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   protected final boolean unionTypeEnabled;
   protected BatchState state;
 
+  // Represents last outcome of next(). If an Exception is thrown
+  // during the method's execution a value IterOutcome.STOP will be assigned.
+  private IterOutcome lastOutcome;
+
   protected AbstractRecordBatch(final T popConfig, final FragmentContext context) throws OutOfMemoryException {
     this(popConfig, context, true, context.newOperatorContext(popConfig));
   }
@@ -113,7 +117,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   }
 
   public final IterOutcome next(final int inputIndex, final RecordBatch b){
-    IterOutcome next = null;
+    IterOutcome next;
     stats.stopProcessing();
     try{
       if (!context.getExecutorState().shouldContinue()) {
@@ -132,15 +136,15 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     }
 
     switch(next) {
-    case OK_NEW_SCHEMA:
-      stats.batchReceived(inputIndex, b.getRecordCount(), true);
-      break;
-    case OK:
-    case EMIT:
-      stats.batchReceived(inputIndex, b.getRecordCount(), false);
-      break;
-    default:
-      break;
+      case OK_NEW_SCHEMA:
+        stats.batchReceived(inputIndex, b.getRecordCount(), true);
+        break;
+      case OK:
+      case EMIT:
+        stats.batchReceived(inputIndex, b.getRecordCount(), false);
+        break;
+      default:
+        break;
     }
 
     return next;
@@ -155,27 +159,38 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
           buildSchema();
           switch (state) {
             case DONE:
-              return IterOutcome.NONE;
+              lastOutcome = IterOutcome.NONE;
+              break;
             case OUT_OF_MEMORY:
               // because we don't support schema changes, it is safe to fail the query right away
               context.getExecutorState().fail(UserException.memoryError()
                 .build(logger));
               // FALL-THROUGH
             case STOP:
-              return IterOutcome.STOP;
+              lastOutcome = IterOutcome.STOP;
+              break;
             default:
               state = BatchState.FIRST;
-              return IterOutcome.OK_NEW_SCHEMA;
+              lastOutcome = IterOutcome.OK_NEW_SCHEMA;
+              break;
           }
+          break;
         }
         case DONE: {
-          return IterOutcome.NONE;
+          lastOutcome = IterOutcome.NONE;
+          break;
         }
         default:
-          return innerNext();
+          lastOutcome = innerNext();
+          break;
       }
+      return lastOutcome;
     } catch (final SchemaChangeException e) {
+      lastOutcome = IterOutcome.STOP;
       throw new DrillRuntimeException(e);
+    } catch (Exception e) {
+      lastOutcome = IterOutcome.STOP;
+      throw e;
     } finally {
       stats.stopProcessing();
     }
@@ -245,6 +260,11 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     return  container;
   }
 
+  @Override
+  public boolean hasFailed() {
+    return lastOutcome == IterOutcome.STOP;
+  }
+
   public RecordBatchStatsContext getRecordBatchStatsContext() {
     return batchStatsContext;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
index cb3a61c..dda4ef5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
@@ -59,6 +59,5 @@ public abstract class AbstractTableFunctionRecordBatch<T extends PhysicalOperato
     setIncoming(incoming.getIncoming());
     lateral = incoming;
   }
-
 }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index c65827c..7473c8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -320,4 +320,19 @@ public interface RecordBatch extends VectorAccessible {
    * buffers.
    */
   WritableBatch getWritableBatch();
+
+  /**
+   * Perform dump of this batch's state to logs.
+   */
+  void dump();
+
+  /**
+   * Use this method to see if the batch has failed. Currently used when logging {@code RecordBatch}'s
+   * state using {@link #dump()} method.
+   *
+   * @return {@code true} if either {@link org.apache.drill.exec.record.RecordBatch.IterOutcome#STOP}
+   * was returned by its or child's {@link #next()} invocation or there was an {@code Exception} thrown
+   * during execution of the batch; {@code false} otherwise
+   */
+  boolean hasFailed();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index e841f2f..696d6db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -292,4 +292,12 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
     container.clear();
     resetRecordCount();
   }
+
+  @Override
+  public String toString() {
+    return "RecordBatchLoader[container=" + container
+        + ", valueCount=" + valueCount
+        + ", schema=" + schema
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
index 47b11a6..04cf32e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
@@ -367,4 +367,18 @@ public class RecordIterator implements VectorAccessible {
     clear();
     clearInflightBatches();
   }
+
+  @Override
+  public String toString() {
+    return "RecordIterator[outerPosition=" + outerPosition
+        + ", innerPosition=" + innerPosition
+        + ", innerRecordCount=" + innerRecordCount
+        + ", totalRecordCount=" + totalRecordCount
+        + ", startBatchPosition=" + startBatchPosition
+        + ", markedInnerPosition" + markedInnerPosition
+        + ", markedOuterPosition=" + markedOuterPosition
+        + ", lastOutcome=" + lastOutcome
+        + ", inputIndex=" + inputIndex
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
index 9dfa129..e4278ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
@@ -107,4 +107,14 @@ public class SchemalessBatch implements CloseableRecordBatch {
 
   @Override
   public VectorContainer getContainer() { return null; }
+
+  @Override
+  public boolean hasFailed() {
+    return false;
+  }
+
+  @Override
+  public void dump() {
+    logger.error("SchemalessBatch[]");
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
index 4063e55..c588f25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
@@ -28,6 +28,9 @@ import java.util.Iterator;
  * Wrap a VectorContainer into a record batch.
  */
 public class SimpleRecordBatch implements RecordBatch {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleRecordBatch.class);
+
   private VectorContainer container;
   private FragmentContext context;
 
@@ -99,4 +102,14 @@ public class SimpleRecordBatch implements RecordBatch {
   public VectorContainer getContainer() {
      return container;
   }
+
+  @Override
+  public void dump() {
+    logger.error("SimpleRecordBatch[container=" + container + "]");
+  }
+
+  @Override
+  public boolean hasFailed() {
+    return false;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index a0b47ed..4f4f88d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -161,4 +161,13 @@ public class SelectionVector4 implements AutoCloseable {
   public void close() {
     clear();
   }
+
+  @Override
+  public String toString() {
+    return "SelectionVector4[data=" + data
+        + ", recordCount=" + recordCount
+        + ", start=" + start
+        + ", length=" + length
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
index 7accdc4..9314da6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
@@ -104,5 +104,4 @@ public abstract class AbstractRecordReader implements RecordReader {
   protected List<SchemaPath> getDefaultColumnsToRead() {
     return GroupScan.ALL_COLUMNS;
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
index a0dda01..edd91d1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
@@ -26,8 +26,8 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.vector.ValueVector;
 
 public interface RecordReader extends AutoCloseable {
-  public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
-  public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
+  long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024;
+  long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000;
 
   /**
    * Configure the RecordReader with the provided schema and the record batch that should be written to.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
index 68c3c36..31c0103 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
@@ -222,4 +222,9 @@ public class StorageStrategy {
       fs.deleteOnExit(path);
     }
   }
-}
\ No newline at end of file
+
+  @Override
+  public String toString() {
+    return "StorageStrategy[umask=" + umask + ", deleteOnExist=" + deleteOnExit + "]";
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
index 6945fff..7668130 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -66,7 +66,7 @@ import org.joda.time.DateTimeConstants;
 /**
  * A RecordReader implementation for Avro data files.
  *
- * @see RecordReader
+ * @see org.apache.drill.exec.store.RecordReader
  */
 public class AvroRecordReader extends AbstractRecordReader {
 
@@ -398,4 +398,17 @@ public class AvroRecordReader extends AbstractRecordReader {
       }
     }
   }
+
+  @Override
+  public String toString() {
+    long currentPosition = -1L;
+    try {
+      currentPosition = reader.tell();
+    } catch (IOException e) {
+      logger.trace("Unable to obtain reader position: " + e.getMessage());
+    }
+    return "AvroRecordReader[File=" + hadoop
+        + ", Position=" + currentPosition
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
index 5d9e105..2580010 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.expr.holders.BigIntHolder;
@@ -54,6 +53,8 @@ public class BsonRecordReader {
   private final boolean readNumbersAsDouble;
   protected DrillBuf workBuf;
   private String currentFieldName;
+  // Used for error context
+  private BsonReader reader;
 
   public BsonRecordReader(DrillBuf managedBuf, boolean allTextMode, boolean readNumbersAsDouble) {
     this(managedBuf, GroupScan.ALL_COLUMNS, readNumbersAsDouble);
@@ -67,6 +68,7 @@ public class BsonRecordReader {
   }
 
   public void write(ComplexWriter writer, BsonReader reader) throws IOException {
+    this.reader = reader;
     reader.readStartDocument();
     BsonType readBsonType = reader.getCurrentBsonType();
     switch (readBsonType) {
@@ -364,17 +366,20 @@ public class BsonRecordReader {
     }
   }
 
-  public UserException.Builder getExceptionWithContext(UserException.Builder exceptionBuilder, String field,
-      String msg, Object... args) {
-    return null;
-  }
-
-  public UserException.Builder getExceptionWithContext(Throwable exception, String field, String msg, Object... args) {
-    return null;
-  }
-
   private void ensure(final int length) {
     workBuf = workBuf.reallocIfNeeded(length);
   }
 
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("BsonRecordReader[");
+    if (reader != null) {
+      sb.append("Name=")
+          .append(reader.getCurrentName())
+          .append(", Type=")
+          .append(reader.getCurrentBsonType());
+    }
+    sb.append(']');
+    return sb.toString();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
index 7a4e4a2..379e2c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
@@ -103,4 +103,12 @@ public class EasyWriter extends AbstractWriter {
   public int getOperatorType() {
     return formatPlugin.getWriterOperatorType();
   }
+
+  @Override
+  public String toString() {
+    return "EasyWriter[location=" + location
+        + ", storageStrategy=" + getStorageStrategy()
+        + ", partitionColumns=" + partitionColumns
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 4b8bbf8..62ace66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -104,7 +104,7 @@ public class JSONRecordReader extends AbstractRecordReader {
         "One of inputPath or embeddedContent must be set but not both."
         );
 
-    if(inputPath != null) {
+    if (inputPath != null) {
       this.hadoopPath = new Path(inputPath);
     } else {
       this.embeddedContent = embeddedContent;
@@ -126,9 +126,11 @@ public class JSONRecordReader extends AbstractRecordReader {
   public String toString() {
     return super.toString()
         + "[hadoopPath = " + hadoopPath
+        + ", currentRecord=" + currentRecordNumberInFile()
+        + ", jsonReader=" + jsonReader
         + ", recordCount = " + recordCount
         + ", parseErrorCount = " + parseErrorCount
-         + ", runningRecordCount = " + runningRecordCount + ", ...]";
+        + ", runningRecordCount = " + runningRecordCount + ", ...]";
   }
 
   @Override
@@ -162,9 +164,9 @@ public class JSONRecordReader extends AbstractRecordReader {
   }
 
   private void setupParser() throws IOException {
-    if(hadoopPath != null){
+    if (hadoopPath != null) {
       jsonReader.setSource(stream);
-    }else{
+    } else {
       jsonReader.setSource(embeddedContent);
     }
     jsonReader.setIgnoreJSONParseErrors(skipMalformedJSONRecords);
@@ -182,7 +184,7 @@ public class JSONRecordReader extends AbstractRecordReader {
     }
 
     UserException.Builder exceptionBuilder = UserException.dataReadError(e)
-            .message("%s - %s", suffix, message);
+        .message("%s - %s", suffix, message);
     if (columnNr > 0) {
       exceptionBuilder.pushContext("Column ", columnNr);
     }
@@ -205,36 +207,32 @@ public class JSONRecordReader extends AbstractRecordReader {
     writer.reset();
     recordCount = 0;
     parseErrorCount = 0;
-    if(write == ReadState.JSON_RECORD_PARSE_EOF_ERROR){
+    if (write == ReadState.JSON_RECORD_PARSE_EOF_ERROR) {
       return recordCount;
     }
-    outside: while(recordCount < DEFAULT_ROWS_PER_BATCH){
-      try{
+    while (recordCount < DEFAULT_ROWS_PER_BATCH) {
+      try {
         writer.setPosition(recordCount);
         write = jsonReader.write(writer);
-        if(write == ReadState.WRITE_SUCCEED){
+        if (write == ReadState.WRITE_SUCCEED) {
           recordCount++;
-        }
-        else if(write == ReadState.JSON_RECORD_PARSE_ERROR || write == ReadState.JSON_RECORD_PARSE_EOF_ERROR){
-          if(skipMalformedJSONRecords == false){
-            handleAndRaise("Error parsing JSON", new Exception(hadoopPath.getName() + " : line nos :" + (recordCount+1)));
+        } else if (write == ReadState.JSON_RECORD_PARSE_ERROR || write == ReadState.JSON_RECORD_PARSE_EOF_ERROR) {
+          if (!skipMalformedJSONRecords) {
+            handleAndRaise("Error parsing JSON", new Exception());
           }
           ++parseErrorCount;
-          if(printSkippedMalformedJSONRecordLineNumber){
-            logger.debug("Error parsing JSON in " + hadoopPath.getName() + " : line nos :" + (recordCount+parseErrorCount));
+          if (printSkippedMalformedJSONRecordLineNumber) {
+            logger.debug("Error parsing JSON in " + hadoopPath.getName() + " : line nos :" + (recordCount + parseErrorCount));
           }
-          if(write == ReadState.JSON_RECORD_PARSE_EOF_ERROR){
-            break outside;
+          if (write == ReadState.JSON_RECORD_PARSE_EOF_ERROR) {
+            break;
           }
+        } else {
+          break;
         }
-        else{
-          break outside;
-        }
+      } catch (IOException ex) {
+        handleAndRaise("Error parsing JSON", ex);
       }
-      catch(IOException ex)
-        {
-           handleAndRaise("Error parsing JSON", ex);
-        }
     }
     // Skip empty json file with 0 row.
     // Only when data source has > 0 row, ensure the batch has one field.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
index fba80e5..adab033 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
@@ -27,7 +27,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 
 public interface JsonProcessor {
 
-  public static enum ReadState {
+  enum ReadState {
     END_OF_STREAM,
     JSON_RECORD_PARSE_ERROR,
     JSON_RECORD_PARSE_EOF_ERROR,
@@ -41,17 +41,11 @@ public interface JsonProcessor {
 
   void ensureAtLeastOneField(BaseWriter.ComplexWriter writer);
 
-  public UserException.Builder getExceptionWithContext(UserException.Builder exceptionBuilder,
-                                                       String field,
-                                                       String msg,
-                                                       Object... args);
+  UserException.Builder getExceptionWithContext(UserException.Builder exceptionBuilder, String message);
 
-  public UserException.Builder getExceptionWithContext(Throwable exception,
-                                                       String field,
-                                                       String msg,
-                                                       Object... args);
+  UserException.Builder getExceptionWithContext(Throwable exception, String message);
 
-  public boolean ignoreJSONParseError();
+  boolean ignoreJSONParseError();
 
-  public void setIgnoreJSONParseErrors(boolean ignoreJSONParseErrors);
+  void setIgnoreJSONParseErrors(boolean ignoreJSONParseErrors);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
index aaa74ae..48a1464 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
 
 import org.apache.drill.exec.store.easy.json.JsonProcessor;
 
+import com.fasterxml.jackson.core.JsonLocation;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -49,8 +50,12 @@ public abstract class BaseJsonProcessor implements JsonProcessor {
 
   protected JsonParser parser;
   protected DrillBuf workBuf;
-  protected JsonToken lastSeenJsonToken = null;
-  boolean ignoreJSONParseErrors = false; // default False
+  protected JsonToken lastSeenJsonToken;
+  boolean ignoreJSONParseErrors;
+  /**
+   * The name of the current field being parsed. For Error messages.
+   */
+  protected String currentFieldName;
 
   /**
    *
@@ -90,26 +95,31 @@ public abstract class BaseJsonProcessor implements JsonProcessor {
   }
 
   @Override
-  public UserException.Builder getExceptionWithContext(
-      UserException.Builder exceptionBuilder, String field, String msg,
-      Object... args) {
-    if (msg != null) {
-      exceptionBuilder.message(msg, args);
-    }
-    if (field != null) {
-      exceptionBuilder.pushContext("Field ", field);
+  public String toString() {
+    JsonLocation location = parser.getCurrentLocation();
+    return getClass().getSimpleName() + "[Line=" + location.getLineNr()
+        + ", Column=" + (location.getColumnNr() + 1)
+        + ", Field=" + getCurrentField()
+        + "]";
+  }
+
+  @Override
+  public UserException.Builder getExceptionWithContext(UserException.Builder builder, String message) {
+    builder.message(message);
+    JsonLocation location = parser.getCurrentLocation();
+    builder.addContext("Line", location.getLineNr())
+        .addContext("Column", location.getColumnNr() + 1);
+    String fieldName = getCurrentField();
+    if (fieldName != null) {
+      builder.addContext("Field", fieldName);
     }
-    exceptionBuilder.pushContext("Column ",
-        parser.getCurrentLocation().getColumnNr() + 1).pushContext("Line ",
-        parser.getCurrentLocation().getLineNr());
-    return exceptionBuilder;
+    return builder;
   }
 
   @Override
-  public UserException.Builder getExceptionWithContext(Throwable e,
-      String field, String msg, Object... args) {
+  public UserException.Builder getExceptionWithContext(Throwable e, String message) {
     UserException.Builder exceptionBuilder = UserException.dataReadError(e);
-    return getExceptionWithContext(exceptionBuilder, field, msg, args);
+    return getExceptionWithContext(exceptionBuilder, message);
   }
 
   /*
@@ -138,4 +148,8 @@ public abstract class BaseJsonProcessor implements JsonProcessor {
     }
     return JsonExceptionProcessingState.PROC_SUCCEED;
   }
+
+  protected String getCurrentField() {
+    return currentFieldName;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
index a5e9f1a..0f92ec5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
@@ -23,8 +23,6 @@ import com.fasterxml.jackson.core.JsonToken;
 
 import io.netty.buffer.DrillBuf;
 
-import org.apache.drill.exec.store.easy.json.JsonProcessor.ReadState;
-import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor.JsonExceptionProcessingState;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter;
 
 public class CountingJsonReader extends BaseJsonProcessor {
@@ -41,18 +39,14 @@ public class CountingJsonReader extends BaseJsonProcessor {
         token = parser.nextToken();
       }
       lastSeenJsonToken = null;
+      if (token == JsonToken.FIELD_NAME) {
+        currentFieldName = parser.getText();
+      }
       if (!parser.hasCurrentToken()) {
         return ReadState.END_OF_STREAM;
       } else if (token != JsonToken.START_OBJECT) {
         throw new com.fasterxml.jackson.core.JsonParseException(
-            parser,
-            String
-                .format(
-                    "Cannot read from the middle of a record. Current token was %s ",
-                    token));
-        // throw new
-        // IllegalStateException(String.format("Cannot read from the middle of a record. Current token was %s",
-        // token));
+            parser, String.format("Cannot read from the middle of a record. Current token was %s ", token));
       }
       writer.rootAsMap().bit("count").writeBit(1);
       parser.skipChildren();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
index 3d88b1a..549df82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
@@ -163,4 +163,19 @@ public class SequenceFileRecordReader extends AbstractRecordReader {
       logger.warn("Exception closing reader: {}", e);
     }
   }
-}
\ No newline at end of file
+
+  @Override
+  public String toString() {
+    long position = -1L;
+    try {
+      if (reader != null) {
+        position = reader.getPos();
+      }
+    } catch (IOException e) {
+      logger.trace("Unable to obtain reader position: " + e.getMessage());
+    }
+    return "SequenceFileRecordReader[File=" + split.getPath()
+        + ", Position=" + position
+        + "]";
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
index 9a1d486..7aa9b04 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
@@ -216,7 +216,7 @@ public class CompliantTextRecordReader extends AbstractRecordReader {
     } catch (IOException | TextParsingException e) {
       throw UserException.dataReadError(e)
           .addContext("Failure while reading file %s. Happened at or shortly before byte position %d.",
-            split.getPath(), reader.getPos())
+              split.getPath(), reader.getPos())
           .build(logger);
     }
   }
@@ -248,4 +248,11 @@ public class CompliantTextRecordReader extends AbstractRecordReader {
       logger.warn("Exception while closing stream.", e);
     }
   }
+
+  @Override
+  public String toString() {
+    return "CompliantTextRecordReader[File=" + split.getPath()
+        + ", reader=" + reader
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
index a181d42..7a9ed46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
@@ -500,4 +500,12 @@ final class TextReader {
     input.close();
   }
 
+  @Override
+  public String toString() {
+    return "TextReader[Line=" + context.currentLine()
+        + ", Column=" + context.currentChar()
+        + ", Record=" + context.currentRecord()
+        + ", Byte pos=" + getPos()
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
index 18437df..f43bb88 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
@@ -242,6 +242,14 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatPlugin.
       }
     }
 
+    @Override
+    public String toString() {
+      return "HttpdLogRecordReader[Path=" + work.getPath()
+          + ", Start=" + work.getStart()
+          + ", Length=" + work.getLength()
+          + ", Line=" + lineNumber.get()
+          + "]";
+    }
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java
index 91f8b99..2a4b4fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java
@@ -261,7 +261,7 @@ public class ImageRecordReader extends AbstractRecordReader {
   }
 
   private void processXmpDirectory(final MapWriter writer, final XmpDirectory directory) {
-    HashSet<String> listItems = new HashSet();
+    HashSet<String> listItems = new HashSet<>();
     XMPMeta xmpMeta = directory.getXMPMeta();
     if (xmpMeta != null) {
       try {
@@ -490,4 +490,9 @@ public class ImageRecordReader extends AbstractRecordReader {
       metadataStream.close();
     }
   }
+
+  @Override
+  public String toString() {
+    return "ImageRecordReader[Path=" + hadoopPath.toUri().getPath() + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
index 56bc1cc..e5d1dc4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
@@ -761,4 +761,11 @@ public class LogRecordReader extends AbstractRecordReader {
       reader = null;
     }
   }
-}
\ No newline at end of file
+
+  @Override
+  public String toString() {
+    return "LogRecordReader[File=" + fileWork.getPath()
+        + ", Line=" + rowIndex
+        + "]";
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
index 1cd393b..aea3218 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
@@ -121,4 +121,11 @@ public class ParquetWriter extends AbstractWriter {
     return CoreOperatorType.PARQUET_WRITER_VALUE;
   }
 
+  @Override
+  public String toString() {
+    return "ParquetWriter[location=" + location
+        + ", storageStrategy=" + getStorageStrategy()
+        + ", partitionColumns=" + partitionColumns
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 2ada17d..17cf8c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -348,4 +348,14 @@ public class ParquetRecordReader extends AbstractRecordReader {
       return (int) Math.min(numRecordsToRead, footer.getBlocks().get(rowGroupIndex).getRowCount());
     }
   }
+
+  @Override
+  public String toString() {
+    return "ParquetRecordReader[File=" + hadoopPath.toUri()
+        + ", Row group index=" + rowGroupIndex
+        + ", Records in row group=" + footer.getBlocks().get(rowGroupIndex).getRowCount()
+        + ", Total records read=" + (readState != null ? readState.recordsRead() : -1)
+        + ", Metadata" + footer
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 0c69d6d..7108ca6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -344,4 +344,9 @@ public class DrillParquetReader extends AbstractRecordReader {
       this.type = type;
     }
   }
+
+  @Override
+  public String toString() {
+    return "DrillParquetReader[pageReadStore=" + pageReadStore + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
index 794687f..d688f3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
@@ -396,4 +396,9 @@ public class PcapRecordReader extends AbstractRecordReader {
           .setSafe(count, value, 0, value.remaining());
     }
   }
+
+  @Override
+  public String toString() {
+    return "PcapRecordReader[File=" + pathToFile.toUri() + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index d2cd9a8..0db17fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -232,4 +232,13 @@ public class DrillTextRecordReader extends AbstractRecordReader {
       logger.warn("Exception closing reader: {}", e);
     }
   }
+
+  @Override
+  public String toString() {
+    return "DrillTextRecordReader[File=" + split.getPath()
+        + ", Record=" + (totalRecordsRead + 1)
+        + ", Start=" + split.getStart()
+        + ", Length=" + split.getLength()
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 2810c04..aaa9806 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -70,10 +70,6 @@ public class JsonReader extends BaseJsonProcessor {
    * outer list.
    */
   private boolean inOuterList;
-  /**
-   * The name of the current field being parsed. For Error messages.
-   */
-  private String currentFieldName;
 
   private FieldSelection selection;
 
@@ -214,10 +210,9 @@ public class JsonReader extends BaseJsonProcessor {
       case WRITE_SUCCEED:
         break;
       default:
-        throw getExceptionWithContext(UserException.dataReadError(),
-            currentFieldName, null).message(
-            "Failure while reading JSON. (Got an invalid read state %s )",
-            readState.toString()).build(logger);
+        throw getExceptionWithContext(UserException.dataReadError(), null).message(
+            "Failure while reading JSON. (Got an invalid read state %s )", readState.toString())
+            .build(logger);
       }
     } catch (com.fasterxml.jackson.core.JsonParseException ex) {
       if (ignoreJSONParseError()) {
@@ -236,13 +231,10 @@ public class JsonReader extends BaseJsonProcessor {
   private void confirmLast() throws IOException {
     parser.nextToken();
     if (!parser.isClosed()) {
-      throw getExceptionWithContext(UserException.dataReadError(),
-          currentFieldName, null)
-          .message(
-              "Drill attempted to unwrap a toplevel list "
-                  + "in your document.  However, it appears that there is trailing content after this top level list.  Drill only "
-                  + "supports querying a set of distinct maps or a single json array with multiple inner maps.")
-          .build(logger);
+      String message = "Drill attempted to unwrap a toplevel list in your document. "
+          + "However, it appears that there is trailing content after this top level list.  Drill only "
+          + "supports querying a set of distinct maps or a single json array with multiple inner maps.";
+      throw getExceptionWithContext(UserException.dataReadError(), message).build(logger);
     }
   }
 
@@ -255,11 +247,9 @@ public class JsonReader extends BaseJsonProcessor {
       break;
     case START_ARRAY:
       if (inOuterList) {
-        throw getExceptionWithContext(UserException.dataReadError(),
-            currentFieldName, null)
-            .message(
-                "The top level of your document must either be a single array of maps or a set "
-                    + "of white space delimited maps.").build(logger);
+        String message = "The top level of your document must either be a single array of maps or a set "
+            + "of white space delimited maps.";
+        throw getExceptionWithContext(UserException.dataReadError(), message).build(logger);
       }
 
       if (skipOuterList) {
@@ -268,11 +258,9 @@ public class JsonReader extends BaseJsonProcessor {
           inOuterList = true;
           writeDataSwitch(writer.rootAsMap());
         } else {
-          throw getExceptionWithContext(UserException.dataReadError(),
-              currentFieldName, null)
-              .message(
-                  "The top level of your document must either be a single array of maps or a set "
-                      + "of white space delimited maps.").build(logger);
+          String message = "The top level of your document must either be a single array of maps or a set "
+              + "of white space delimited maps.";
+          throw getExceptionWithContext(UserException.dataReadError(), message).build(logger);
         }
 
       } else {
@@ -285,17 +273,14 @@ public class JsonReader extends BaseJsonProcessor {
         confirmLast();
         return ReadState.END_OF_STREAM;
       } else {
-        throw getExceptionWithContext(UserException.dataReadError(),
-            currentFieldName, null).message(
-            "Failure while parsing JSON.  Ran across unexpected %s.",
-            JsonToken.END_ARRAY).build(logger);
+        throw getExceptionWithContext(UserException.dataReadError(), null).message(
+            "Failure while parsing JSON.  Ran across unexpected %s.", JsonToken.END_ARRAY).build(logger);
       }
 
     case NOT_AVAILABLE:
       return ReadState.END_OF_STREAM;
     default:
-      throw getExceptionWithContext(UserException.dataReadError(),
-          currentFieldName, null)
+      throw getExceptionWithContext(UserException.dataReadError(), null)
           .message(
               "Failure while parsing JSON.  Found token of [%s].  Drill currently only supports parsing "
                   + "json strings that contain either lists or maps.  The root object cannot be a scalar.",
@@ -412,9 +397,9 @@ public class JsonReader extends BaseJsonProcessor {
           break;
 
         default:
-          throw getExceptionWithContext(UserException.dataReadError(),
-              currentFieldName, null).message("Unexpected token %s",
-              parser.getCurrentToken()).build(logger);
+          throw getExceptionWithContext(UserException.dataReadError(), null)
+              .message("Unexpected token %s", parser.getCurrentToken())
+              .build(logger);
         }
 
       }
@@ -478,8 +463,7 @@ public class JsonReader extends BaseJsonProcessor {
         break;
 
       default:
-        throw getExceptionWithContext(UserException.dataReadError(),
-            currentFieldName, null).message("Unexpected token %s",
+        throw getExceptionWithContext(UserException.dataReadError(), null).message("Unexpected token %s",
             parser.getCurrentToken()).build(logger);
       }
     }
@@ -591,8 +575,7 @@ public class JsonReader extends BaseJsonProcessor {
               .build(logger);
         }
       } catch (Exception e) {
-        throw getExceptionWithContext(e, this.currentFieldName, null).build(
-            logger);
+        throw getExceptionWithContext(e, null).build(logger);
       }
     }
     list.endList();
@@ -637,8 +620,7 @@ public class JsonReader extends BaseJsonProcessor {
         handleString(parser, list);
         break;
       default:
-        throw getExceptionWithContext(UserException.dataReadError(),
-            currentFieldName, null).message("Unexpected token %s",
+        throw getExceptionWithContext(UserException.dataReadError(), null).message("Unexpected token %s",
             parser.getCurrentToken()).build(logger);
       }
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 7cb07eb..a9e9e62 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -207,6 +207,10 @@ public class FragmentExecutor implements Runnable {
   }
 
   private void cleanup(FragmentState state) {
+    if (fragmentState.get() == FragmentState.FAILED) {
+      root.dumpBatches();
+    }
+
     closeOutResources();
 
     updateState(state);
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
index 89731ff..4bb1a22 100644
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -269,4 +269,9 @@ public class ColumnChunkIncReadStore implements PageReadStore {
   public long getRowCount() {
     return rowCount;
   }
+
+  @Override
+  public String toString() {
+    return "ColumnChunkIncReadStore[File=" + path.toUri() + "]";
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestOperatorDump.java b/exec/java-exec/src/test/java/org/apache/drill/TestOperatorDump.java
new file mode 100644
index 0000000..18ba61c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestOperatorDump.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.ConsoleAppender;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch;
+import org.apache.drill.exec.testing.Controls;
+import org.apache.drill.exec.testing.ControlsInjectionUtil;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.LogFixture;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestOperatorDump extends ClusterTest {
+
+  private static final String ENTRY_DUMP_COMPLETED = "Batch dump completed";
+  private static final String ENTRY_DUMP_STARTED = "Batch dump started";
+
+  private LogFixture logFixture;
+  private EventAwareContextAppender appender;
+
+  @BeforeClass
+  public static void setupFiles() {
+    dirTestWatcher.copyResourceToRoot(Paths.get("multilevel"));
+  }
+
+  @Before
+  public void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    appender = new EventAwareContextAppender();
+    logFixture = LogFixture.builder()
+        .toConsole(appender, LogFixture.DEFAULT_CONSOLE_FORMAT)
+        .build();
+    startCluster(builder);
+  }
+
+  @After
+  public void tearDown(){
+    logFixture.close();
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testScanBatchChecked() throws Exception {
+    String exceptionDesc = "next-allocate";
+    final String controls = Controls.newBuilder()
+        .addException(ScanBatch.class, exceptionDesc, OutOfMemoryException.class, 0, 1)
+        .build();
+    ControlsInjectionUtil.setControls(client.client(), controls);
+    String query = "select * from dfs.`multilevel/parquet` limit 100";
+    try {
+      client.queryBuilder().sql(query).run();
+    } catch (UserRemoteException e) {
+      assertTrue(e.getMessage().contains(exceptionDesc));
+
+      String[] expectedEntries = new String[] {ENTRY_DUMP_STARTED, ENTRY_DUMP_COMPLETED};
+      validateContainsEntries(expectedEntries, ScanBatch.class.getName());
+      throw e;
+    }
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testExternalSortUnchecked() throws Exception {
+    Class<?> siteClass = org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch.class;
+    final String controls = Controls.newBuilder()
+        .addException(siteClass, ExternalSortBatch.INTERRUPTION_AFTER_SORT, RuntimeException.class)
+        .build();
+    ControlsInjectionUtil.setControls(client.client(), controls);
+    String query = "select n_name from cp.`tpch/lineitem.parquet` order by n_name";
+    try {
+      client.queryBuilder().sql(query).run();
+    } catch (UserRemoteException e) {
+      assertTrue(e.getMessage().contains(ExternalSortBatch.INTERRUPTION_AFTER_SORT));
+
+      String[] expectedEntries = new String[] {ENTRY_DUMP_STARTED, ENTRY_DUMP_COMPLETED};
+      validateContainsEntries(expectedEntries, ExternalSortBatch.class.getName());
+      throw e;
+    }
+  }
+
+  private void validateContainsEntries(String[] entries, String expectedClassName) {
+    if (entries == null) {
+      entries = new String[0];
+    }
+    List<String> messages = appender.getMessages();
+    List<String> entryList = new ArrayList<>(entries.length);
+    Collections.addAll(entryList, entries);
+    Iterator<String> it = entryList.iterator();
+    while (it.hasNext()) {
+      String entry = it.next();
+      for (String message : messages) {
+        if (message.contains(entry)) {
+          it.remove();
+          break;
+        }
+      }
+    }
+    assertTrue(String.format("Entries %s were not found in logs.", entryList), entryList.isEmpty());
+
+    Set<String> loggerNames = appender.getLoggerNames();
+    assertTrue(String.format("Entry for class %s was not found", expectedClassName),
+        loggerNames.contains(expectedClassName));
+  }
+
+  // ConsoleAppender which stores logged events
+  private static class EventAwareContextAppender extends ConsoleAppender<ILoggingEvent> {
+
+    private List<ILoggingEvent> events = new ArrayList<>();
+
+    @Override
+    protected void append(ILoggingEvent e) {
+      events.add(e);
+    }
+
+    List<String> getMessages() {
+      return events.stream()
+          .map(ILoggingEvent::getMessage)
+          .collect(Collectors.toList());
+    }
+
+    Set<String> getLoggerNames() {
+      return events.stream()
+          .map(ILoggingEvent::getLoggerName)
+          .collect(Collectors.toSet());
+    }
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 34d735e..94e0c0e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -288,6 +288,15 @@ public class MockRecordBatch implements CloseableRecordBatch {
     this.limitWithUnnest = limitWithUnnest;
   }
 
+  @Override
+  public boolean hasFailed() {
+    return false;
+  }
+
+  @Override
+  public void dump() {
+  }
+
   public static class Builder {
     private final List<RowSet> rowSets = new ArrayList<>();
     private final List<IterOutcome> iterOutcomes = new ArrayList<>();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index 6d5b666..a176646 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -106,6 +106,11 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector> {
   }
 
   @Override
+  public void dumpBatches() {
+    screenRoot.dumpBatches();
+  }
+
+  @Override
   public void close() throws Exception {
     screenRoot.close();
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
index c7105f9..aefa28a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
@@ -162,6 +162,15 @@ public class MockLateralJoinBatch implements LateralContract, CloseableRecordBat
 
   }
 
+  @Override
+  public boolean hasFailed() {
+    return false;
+  }
+
+  @Override
+  public void dump() {
+  }
+
   @Override public int getRecordCount() {
     return 0;
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
index 6f1e6e0..c05cdfd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
@@ -121,6 +121,15 @@ public class BloomFilterTest {
     public Iterator<VectorWrapper<?>> iterator() {
       return null;
     }
+
+    @Override
+    public void dump() {
+    }
+
+    @Override
+    public boolean hasFailed() {
+      return false;
+    }
   }
 
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
index b54b0b0..b62a188 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
@@ -86,6 +86,7 @@ public class LogFixture implements AutoCloseable {
     private String consoleFormat = DEFAULT_CONSOLE_FORMAT;
     private boolean logToConsole;
     private List<LogSpec> loggers = new ArrayList<>();
+    private ConsoleAppender<ILoggingEvent> appender;
 
     /**
      * Send all enabled logging to the console (if not already configured.) Some
@@ -102,6 +103,11 @@ public class LogFixture implements AutoCloseable {
       return this;
     }
 
+    public LogFixtureBuilder toConsole(ConsoleAppender<ILoggingEvent> appender, String format) {
+      this.appender = appender;
+      return toConsole(format);
+    }
+
     /**
      * Send logging to the console using the defined format.
      *
@@ -195,7 +201,7 @@ public class LogFixture implements AutoCloseable {
 
   private void setupConsole(LogFixtureBuilder builder) {
     drillLogger = (Logger)LoggerFactory.getLogger(DRILL_PACKAGE_NAME);
-    if (drillLogger.getAppender("STDOUT") != null) {
+    if (builder.appender == null && drillLogger.getAppender("STDOUT") != null) {
       return;
     }
     LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
@@ -204,10 +210,10 @@ public class LogFixture implements AutoCloseable {
     ple.setContext(lc);
     ple.start();
 
-    appender = new ConsoleAppender<>( );
+    appender = builder.appender == null ? new ConsoleAppender<>() : builder.appender;
     appender.setContext(lc);
     appender.setName("Console");
-    appender.setEncoder( ple );
+    appender.setEncoder(ple);
     appender.start();
 
     Logger root = (Logger)LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);


[drill] 02/02: DRILL-6755: Avoid building Hash Table for inner/left join when probe side is empty

Posted by vi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 3b1ae159b94ef7c1d67ddde474c75d5558d3e50a
Author: Ben-Zvi <bb...@mapr.com>
AuthorDate: Tue Sep 25 19:07:10 2018 -0700

    DRILL-6755: Avoid building Hash Table for inner/left join when probe side is empty
    
    - Preparations and cleanup for DRILL-6755
    
    clsoes #1480
---
 .../exec/physical/impl/join/HashJoinBatch.java     | 60 ++++++++++++++--------
 .../exec/record/AbstractBinaryRecordBatch.java     |  2 +-
 .../drill/exec/work/batch/BaseRawBatchBuffer.java  |  2 +-
 .../physical/impl/join/TestHashJoinOutcome.java    | 43 ++++++++++++++++
 4 files changed, 84 insertions(+), 23 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 89ab8d4..658f03a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -81,7 +81,6 @@ import org.apache.drill.exec.work.filter.BloomFilterDef;
 import org.apache.drill.exec.work.filter.RuntimeFilterDef;
 import org.apache.drill.exec.work.filter.RuntimeFilterReporter;
 
-
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
@@ -101,7 +100,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA
  *   processed individually (that Build partition should be smaller than the original, hence likely fit whole into
  *   memory to allow probing; if not -- see below).
  *      Processing of each spilled pair is EXACTLY like processing the original Build/Probe incomings. (As a fact,
- *   the {@Link #innerNext() innerNext} method calls itself recursively !!). Thus the spilled build partition is
+ *   the {@link #innerNext()} method calls itself recursively !!). Thus the spilled build partition is
  *   read and divided into new partitions, which in turn may spill again (and again...).
  *   The code tracks these spilling "cycles". Normally any such "again" (i.e. cycle of 2 or greater) is a waste,
  *   indicating that the number of partitions chosen was too small.
@@ -116,6 +115,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
 
   // Join type, INNER, LEFT, RIGHT or OUTER
   private final JoinRelType joinType;
+  private boolean joinIsLeftOrFull;
+  private boolean joinIsRightOrFull;
+  private boolean skipHashTableBuild; // when outer side is empty, and the join is inner or left (see DRILL-6755)
 
   // Join conditions
   private final List<JoinCondition> conditions;
@@ -131,8 +133,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
   private final Set<String> buildJoinColumns;
 
   // Fields used for partitioning
-
-  private long maxIncomingBatchSize;
   /**
    * The number of {@link HashPartition}s. This is configured via a system option and set in {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
    */
@@ -264,6 +264,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
         buildSchema = right.getSchema();
         // position of the new "column" for keeping the hash values (after the real columns)
         rightHVColPosition = right.getContainer().getNumberOfColumns();
+        // In special cases, when the probe side is empty, and inner/left join - no need for Hash Table
+        skipHashTableBuild = leftUpstream == IterOutcome.NONE && ! joinIsRightOrFull;
         // We only need the hash tables if we have data on the build side.
         setupHashTable();
       }
@@ -447,12 +449,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
 
       // Try to probe and project, or recursively handle a spilled partition
       if (!buildSideIsEmpty.booleanValue() ||  // If there are build-side rows
-        joinType != JoinRelType.INNER) {  // or if this is a left/full outer join
+          joinIsLeftOrFull) {  // or if this is a left/full outer join
 
         prefetchFirstProbeBatch();
 
         if (leftUpstream.isError() ||
-            ( leftUpstream == NONE && joinType != JoinRelType.FULL && joinType != JoinRelType.RIGHT )) {
+            ( leftUpstream == NONE && ! joinIsRightOrFull )) {
           // A termination condition was reached while prefetching the first probe side data holding batch.
           // We need to terminate.
           return leftUpstream;
@@ -568,19 +570,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
 
       } else {
         // Our build side is empty, we won't have any matches, clear the probe side
-        if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
-          for (final VectorWrapper<?> wrapper : probeBatch) {
-            wrapper.getValueVector().clear();
-          }
-          probeBatch.kill(true);
-          leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
-          while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
-            for (final VectorWrapper<?> wrapper : probeBatch) {
-              wrapper.getValueVector().clear();
-            }
-            leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
-          }
-        }
+        killAndDrainLeftUpstream();
       }
 
       // No more output records, clean up and return
@@ -596,10 +586,31 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     }
   }
 
+  /**
+   *  In case an upstream data is no longer needed, send a kill and flush any remaining batch
+   *
+   * @param batch probe or build batch
+   * @param upstream which upstream
+   * @param isLeft is it the left or right
+   */
+  private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream, boolean isLeft) {
+      batch.kill(true);
+      while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == IterOutcome.OK) {
+        for (final VectorWrapper<?> wrapper : batch) {
+          wrapper.getValueVector().clear();
+        }
+        upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT, batch);
+      }
+  }
+  private void killAndDrainLeftUpstream() { killAndDrainUpstream(probeBatch, leftUpstream, true); }
+  private void killAndDrainRightUpstream() { killAndDrainUpstream(buildBatch, rightUpstream, false); }
+
   private void setupHashTable() throws SchemaChangeException {
     final List<Comparator> comparators = Lists.newArrayListWithExpectedSize(conditions.size());
     conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
 
+    if ( skipHashTableBuild ) { return; }
+
     // Setup the hash table configuration object
     List<NamedExpression> leftExpr = new ArrayList<>(conditions.size());
 
@@ -819,6 +830,11 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
       return null;
     }
 
+    if ( skipHashTableBuild ) { // No hash table needed - then consume all the right upstream
+      killAndDrainRightUpstream();
+      return null;
+    }
+
     HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
     boolean firstCycle = cycleNum == 0;
 
@@ -1013,7 +1029,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
         final MajorType outputType;
         // If left or full outer join, then the output type must be nullable. However, map types are
         // not nullable so we must exclude them from the check below (see DRILL-2197).
-        if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) && inputType.getMode() == DataMode.REQUIRED
+        if (joinIsLeftOrFull && inputType.getMode() == DataMode.REQUIRED
             && inputType.getMinorType() != TypeProtos.MinorType.MAP) {
           outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
         } else {
@@ -1034,7 +1050,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
 
         // If right or full outer join then the output type should be optional. However, map types are
         // not nullable so we must exclude them from the check below (see DRILL-2771, DRILL-2197).
-        if ((joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) && inputType.getMode() == DataMode.REQUIRED
+        if (joinIsRightOrFull && inputType.getMode() == DataMode.REQUIRED
             && inputType.getMinorType() != TypeProtos.MinorType.MAP) {
           outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
         } else {
@@ -1074,6 +1090,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     this.buildBatch = right;
     this.probeBatch = left;
     joinType = popConfig.getJoinType();
+    joinIsLeftOrFull  = joinType == JoinRelType.LEFT  || joinType == JoinRelType.FULL;
+    joinIsRightOrFull = joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL;
     conditions = popConfig.getConditions();
     this.popConfig = popConfig;
     rightExpr = new ArrayList<>(conditions.size());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
index e7fa4e6..486fb1e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
@@ -113,7 +113,7 @@ public abstract class AbstractBinaryRecordBatch<T extends PhysicalOperator> exte
     return verifyOutcomeToSetBatchState(leftUpstream, rightUpstream);
   }
 
-  /*
+  /**
    * Checks for the operator specific early terminal condition.
    * @return true if the further processing can stop.
    *         false if the further processing is needed.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
index 6d77d63..5487d95 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -105,7 +105,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
   @Override
   public void close() {
     if (!isTerminated() && context.getExecutorState().shouldContinue()) {
-      final String msg = String.format("Cleanup before finished. %d out of %d strams have finished", completedStreams(), fragmentCount);
+      final String msg = String.format("Cleanup before finished. %d out of %d streams have finished", completedStreams(), fragmentCount);
       throw  new IllegalStateException(msg);
     }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
index 349a295..5beb7cb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
@@ -209,4 +209,47 @@ public class TestHashJoinOutcome extends PhysicalOpUnitTestBase {
   public void testHashJoinNoneOutcomeUninitLeftSide() {
     testHashJoinOutcomes(UninitializedSide.Left, RecordBatch.IterOutcome.NONE, RecordBatch.IterOutcome.NONE);
   }
+
+  /**
+   * Testing for DRILL-6755: No Hash Table is built when the first probe batch is NONE
+   */
+  @Test
+  public void testHashJoinWhenProbeIsNONE() {
+
+    inputOutcomesLeft.add(RecordBatch.IterOutcome.NONE);
+
+    inputOutcomesRight.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomesRight.add(RecordBatch.IterOutcome.OK);
+    inputOutcomesRight.add(RecordBatch.IterOutcome.NONE);
+
+    // for the probe side input - use multiple batches (to check that they are all cleared/drained)
+    final List<VectorContainer> buildSideinputContainer = new ArrayList<>(5);
+    buildSideinputContainer.add(emptyInputRowSetRight.container());
+    buildSideinputContainer.add(nonEmptyInputRowSetRight.container());
+    RowSet.SingleRowSet secondInputRowSetRight = operatorFixture.rowSetBuilder(inputSchemaRight).addRow(456).build();
+    RowSet.SingleRowSet thirdInputRowSetRight = operatorFixture.rowSetBuilder(inputSchemaRight).addRow(789).build();
+    buildSideinputContainer.add(secondInputRowSetRight.container());
+    buildSideinputContainer.add(thirdInputRowSetRight.container());
+
+    final MockRecordBatch mockInputBatchRight = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, buildSideinputContainer, inputOutcomesRight, batchSchemaRight);
+    final MockRecordBatch mockInputBatchLeft = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, inputContainerLeft, inputOutcomesLeft, batchSchemaLeft);
+
+    List<JoinCondition> conditions = Lists.newArrayList();
+
+    conditions.add(new JoinCondition(SqlKind.EQUALS.toString(), FieldReference.getWithQuotedRef("leftcol"), FieldReference.getWithQuotedRef("rightcol")));
+
+    HashJoinPOP hjConf = new HashJoinPOP(null, null, conditions, JoinRelType.INNER);
+
+    HashJoinBatch hjBatch = new HashJoinBatch(hjConf, operatorFixture.getFragmentContext(), mockInputBatchLeft, mockInputBatchRight);
+
+    RecordBatch.IterOutcome gotOutcome = hjBatch.next();
+    assertTrue(gotOutcome == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    gotOutcome = hjBatch.next();
+    assertTrue(gotOutcome == RecordBatch.IterOutcome.NONE);
+
+    secondInputRowSetRight.clear();
+    thirdInputRowSetRight.clear();
+    buildSideinputContainer.clear();
+  }
 }