You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/10/01 12:36:05 UTC

[GitHub] asfgit closed pull request #1455: DRILL-6724: Dump operator context to logs when error occurs during query execution

asfgit closed pull request #1455: DRILL-6724: Dump operator context to logs when error occurs during query execution
URL: https://github.com/apache/drill/pull/1455
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
index 56cb935f326..3a3ca5af29a 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
@@ -771,8 +771,10 @@ public String getErrorLocation() {
    * @return generated user error message
    */
   private String generateMessage(boolean includeErrorIdAndIdentity) {
+    boolean seeLogsMessage = errorType == DrillPBError.ErrorType.INTERNAL_ERROR
+        || errorType == DrillPBError.ErrorType.SYSTEM;
     return errorType + " ERROR: " + super.getMessage() + "\n\n" +
-        context.generateContextMessage(includeErrorIdAndIdentity);
+        context.generateContextMessage(includeErrorIdAndIdentity, seeLogsMessage);
   }
 
 }
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java b/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java
index fa2c4373e75..271462ac755 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java
@@ -29,6 +29,8 @@
  */
 class UserExceptionContext {
 
+  private static final String NEW_LINE = System.lineSeparator();
+
   private final String errorId;
   private final List<String> contextList;
 
@@ -133,17 +135,26 @@ String getErrorId() {
    * generate a context message
    * @return string containing all context information concatenated
    */
-  String generateContextMessage(boolean includeErrorIdAndIdentity) {
+  String generateContextMessage(boolean includeErrorIdAndIdentity, boolean includeSeeLogsMessage) {
     StringBuilder sb = new StringBuilder();
 
     for (String context : contextList) {
-      sb.append(context).append("\n");
+      sb.append(context)
+          .append(NEW_LINE);
+    }
+
+    if (includeSeeLogsMessage) {
+      sb.append(NEW_LINE)
+          .append("Please, refer to logs for more information.")
+          .append(NEW_LINE);
     }
 
     if (includeErrorIdAndIdentity) {
       // add identification infos
-      sb.append("\n[Error Id: ");
-      sb.append(errorId).append(" ");
+      sb.append(NEW_LINE)
+          .append("[Error Id: ")
+          .append(errorId)
+          .append(" ");
       if (endpoint != null) {
         sb.append("on ")
             .append(endpoint.getAddress())
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
index 3c7ca8ebaf7..b68f574036a 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -81,6 +81,7 @@
   private final String tableName;
   private OperatorContext operatorContext;
   private VectorContainerWriter vectorWriter;
+  private DBDocumentReaderBase reader;
 
   private DrillBuf buffer;
 
@@ -195,7 +196,7 @@ public int next() {
     vectorWriter.reset();
 
     int recordCount = 0;
-    DBDocumentReaderBase reader = null;
+    reader = null;
 
     while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION) {
       vectorWriter.setPosition(recordCount);
@@ -526,4 +527,18 @@ public void close() {
       table.close();
     }
   }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("MaprDBJsonRecordReader[Table=")
+        .append(table.getPath());
+    if (reader != null) {
+      sb.append(", Document ID=")
+          .append(IdCodec.asString(reader.getId()));
+    }
+    sb.append(", reader=")
+        .append(reader)
+        .append(']');
+    return sb.toString();
+  }
 }
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 86038c479e5..2db1d02e825 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -338,4 +338,9 @@ private boolean canAddNewRow(int rowCount) {
     return rowCount < TARGET_RECORD_COUNT &&
         operatorContext.getAllocator().getAllocatedMemory() < MAX_ALLOCATED_MEMORY_PER_BATCH;
   }
+
+  @Override
+  public String toString() {
+    return "HBaseRecordReader[Table=" + hbaseTableName.getNamespaceAsString() + "]";
+  }
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
index 354a61e4c4d..ba1cd30b50a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
@@ -430,4 +430,19 @@ protected boolean hasNextValue(Object value) {
     }
   }
 
+  @Override
+  public String toString() {
+    long position = -1;
+    try {
+      if (reader != null) {
+        position = reader.getPos();
+      }
+    } catch (IOException e) {
+      logger.trace("Unable to obtain reader position: " + e.getMessage());
+    }
+    return getClass().getSimpleName() + "[Database=" + table.getDbName()
+        + ", Table=" + table.getTableName()
+        + ", Position=" + position
+        + "]";
+  }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
index 1b6e2111b1d..cd732a61cdd 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
@@ -279,6 +279,13 @@ public void close() throws Exception {
     AutoCloseables.close(resultSet, statement, connection);
   }
 
+  @Override
+  public String toString() {
+    return "JdbcRecordReader[sql=" + sql
+        + ", Plugin=" + storagePluginName
+        + "]";
+  }
+
   private abstract class Copier<T extends ValueVector.Mutator> {
     protected final int columnIndex;
     protected final ResultSet result;
@@ -478,5 +485,4 @@ void copy(int index) throws SQLException {
     }
 
   }
-
 }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
index d715ada9cd5..9559c3d8c98 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
@@ -60,6 +60,7 @@
   private final boolean enableAllTextMode;
   private final boolean readNumbersAsDouble;
   private final String kafkaMsgReader;
+  private int currentMessageCount;
 
   public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, List<SchemaPath> projectedColumns,
       FragmentContext context, KafkaStoragePlugin plugin) {
@@ -105,27 +106,27 @@ public int next() {
     writer.allocate();
     writer.reset();
     Stopwatch watch = Stopwatch.createStarted();
-    int messageCount = 0;
+    currentMessageCount = 0;
 
     try {
       while (currentOffset < subScanSpec.getEndOffset() - 1 && msgItr.hasNext()) {
         ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
         currentOffset = consumerRecord.offset();
-        writer.setPosition(messageCount);
+        writer.setPosition(currentMessageCount);
         messageReader.readMessage(consumerRecord);
-        if (++messageCount >= DEFAULT_MESSAGES_PER_BATCH) {
+        if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
           break;
         }
       }
 
       messageReader.ensureAtLeastOneField();
-      writer.setValueCount(messageCount);
-      logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), messageCount);
+      writer.setValueCount(currentMessageCount);
+      logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), currentMessageCount);
       logger.debug("Last offset consumed for {}:{} is {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
           currentOffset);
-      return messageCount;
+      return currentMessageCount;
     } catch (Exception e) {
-      String msg = "Failure while reading messages from kafka. Recordreader was at record: " + (messageCount + 1);
+      String msg = "Failure while reading messages from kafka. Recordreader was at record: " + (currentMessageCount + 1);
       throw UserException.dataReadError(e).message(msg).addContext(e.getMessage()).build(logger);
     }
   }
@@ -139,4 +140,15 @@ public void close() throws Exception {
     messageReader.close();
   }
 
+  @Override
+  public String toString() {
+    return "KafkaRecordReader[messageReader=" + messageReader
+        + ", kafkaPollTimeOut=" + kafkaPollTimeOut
+        + ", currentOffset=" + currentOffset
+        + ", enableAllTextMode=" + enableAllTextMode
+        + ", readNumbersAsDouble=" + readNumbersAsDouble
+        + ", kafkaMsgReader=" + kafkaMsgReader
+        + ", currentMessageCount=" + currentMessageCount
+        + "]";
+  }
 }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
index a62357daffd..40e9e129bba 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
@@ -105,4 +105,8 @@ public void close() throws IOException {
     }
   }
 
+  @Override
+  public String toString() {
+    return "JsonMessageReader[jsonReader=" + jsonReader + "]";
+  }
 }
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
index 845738c486d..976b16d04d6 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
@@ -78,6 +78,9 @@
   private OutputMutator output;
   private OperatorContext context;
 
+  private String lastColumnName;
+  private Type lastColumnType;
+
   private static class ProjectedColumnInfo {
     int index;
     ValueVector vv;
@@ -176,6 +179,8 @@ private void initCols(Schema schema) throws SchemaChangeException {
 
       final String name = col.getName();
       final Type kuduType = col.getType();
+      lastColumnName = name;
+      lastColumnType = kuduType;
       MinorType minorType = TYPES.get(kuduType);
       if (minorType == null) {
         logger.warn("Ignoring column that is unsupported.", UserException
@@ -326,4 +331,10 @@ private void addRowResult(RowResult result, int rowIndex) throws SchemaChangeExc
   public void close() {
   }
 
+  @Override
+  public String toString() {
+    return "KuduRecordReader[Column=" + lastColumnName
+        + ", Type=" + lastColumnType
+        + "]";
+  }
 }
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
index d0fa1581c57..76115763d47 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
@@ -77,4 +77,9 @@ public StoragePluginConfig getStorage() {
   public KuduStoragePlugin getPlugin() {
     return plugin;
   }
+
+  @Override
+  public String toString() {
+    return "KuduWriter[name=" + name + ", storageStrategy=" + getStorageStrategy() + "]";
+  }
 }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index a79e39aa3c5..f5d1f2e0187 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -216,4 +216,9 @@ public int next() {
   public void close() {
   }
 
+  @Override
+  public String toString() {
+    Object reader = isBsonRecordReader ? bsonReader : jsonReader;
+    return "MongoRecordReader[reader=" + reader + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
index da988dec4f8..521aba128c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
@@ -106,4 +106,12 @@ public boolean isBufferedOperator(QueryContext queryContext) {
     return queryContext == null ||
       1 < (int) queryContext.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
   }
+
+  @Override
+  public String toString() {
+    return "HashAggregate[groupByExprs=" + groupByExprs
+        + ", aggrExprs=" + aggrExprs
+        + ", cardinality=" + cardinality
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
index d28d563103f..320bc6d9989 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
@@ -106,4 +106,15 @@ public float getCompletionFactor() {
   public int getOperatorType() {
     return CoreOperatorType.ORDERED_PARTITION_SENDER_VALUE;
   }
+
+  @Override
+  public String toString() {
+    return "OrderedPartitionSender[orderings=" + orderings
+        + ", ref=" + ref
+        + ", sendingWidth=" + sendingWidth
+        + ", recordsToSample=" + recordsToSample
+        + ", samplingFactor=" + samplingFactor
+        + ", completionFactor=" + completionFactor
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
index 29f8bb2fe3f..4ea710d8799 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
@@ -59,4 +59,9 @@ public SelectionVectorMode getSVMode() {
   public int getOperatorType() {
     return CoreOperatorType.PARTITION_LIMIT_VALUE;
   }
+
+  @Override
+  public String toString() {
+    return "PartitionLimit[partitionColumn=" + partitionColumn + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
index 85ef7da2c4e..5d65c396a37 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
@@ -71,4 +71,11 @@ public SelectionVectorMode getSVMode() {
   public int getOperatorType() {
     return CoreOperatorType.OLD_SORT_VALUE;
   }
+
+  @Override
+  public String toString() {
+    return "Sort[orderings=" + orderings
+        + ", reverse=" + reverse
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java
index 54d1b4d72bc..d2a87d508b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java
@@ -69,4 +69,11 @@ public int getOperatorType() {
     return CoreOperatorType.TOP_N_SORT_VALUE;
   }
 
+  @Override
+  public String toString() {
+    return "TopN[orderings=" + orderings
+        + ", reverse=" + reverse
+        + ", limit=" + limit
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java
index 543c09f41da..3ddaa7f24ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java
@@ -95,6 +95,17 @@ public boolean isFrameUnitsRows() {
     return frameUnitsRows;
   }
 
+  @Override
+  public String toString() {
+    return "WindowPOP[withins=" + withins
+        + ", aggregations=" + aggregations
+        + ", orderings=" + orderings
+        + ", frameUnitsRows=" + frameUnitsRows
+        + ", start=" + start
+        + ", end=" + end
+        + "]";
+  }
+
   @JsonTypeName("windowBound")
   public static class Bound {
     private final boolean unbounded;
@@ -117,6 +128,11 @@ public boolean isCurrent() {
     public long getOffset() {
       return offset;
     }
+
+    @Override
+    public String toString() {
+      return "Bound[unbounded=" + unbounded + ", offset=" + offset + "]";
+    }
   }
 
   public static Bound newBound(RexWindowBound windowBound) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index e1482781fe9..9142a2aded8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl;
 
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.drill.common.DeferredException;
@@ -123,6 +124,28 @@ public void receivingFragmentFinished(final FragmentHandle handle) {
     logger.warn("Currently not handling FinishedFragment message");
   }
 
+  @Override
+  public void dumpBatches() {
+    final int numberOfBatchesToDump = 2;
+    logger.error("Batch dump started: dumping last {} failed batches", numberOfBatchesToDump);
+    // As batches are stored in a 'flat' List there is a need to filter out the failed batch
+    // and a few of its parent (actual number of batches is set by a constant defined above)
+    List<CloseableRecordBatch> failedBatchStack = new LinkedList<>();
+    for (int i = operators.size() - 1; i >= 0; i--) {
+      CloseableRecordBatch batch = operators.get(i);
+      if (batch.hasFailed()) {
+        failedBatchStack.add(0, batch);
+        if (failedBatchStack.size() == numberOfBatchesToDump) {
+          break;
+        }
+      }
+    }
+    for (CloseableRecordBatch batch : failedBatchStack) {
+      batch.dump();
+    }
+    logger.error("Batch dump completed.");
+  }
+
   @Override
   public void close() throws Exception {
     // We want to account for the time spent waiting here as Wait time in the operator profile
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
index df0f89b9c1e..34f21312cb1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
@@ -44,4 +44,10 @@
    * @param handle The handle pointing to the downstream receiver that does not need anymore data.
    */
   void receivingFragmentFinished(FragmentHandle handle);
+
+  /**
+   * Dump failed batches' state preceded by its parent's state to logs. Invoked when there is a
+   * failure during fragment execution.
+   */
+  void dumpBatches();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index cd24a4ca592..dc8dd0fd853 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -83,6 +83,10 @@
   private final List<Map<String, String>> implicitColumnList;
   private String currentReaderClassName;
   private final RecordBatchStatsContext batchStatsContext;
+  // Represents last outcome of next(). If an Exception is thrown
+  // during the method's execution a value IterOutcome.STOP will be assigned.
+  private IterOutcome lastOutcome;
+
   /**
    *
    * @param context
@@ -160,7 +164,8 @@ public void kill(boolean sendUpstream) {
   @Override
   public IterOutcome next() {
     if (done) {
-      return IterOutcome.NONE;
+      lastOutcome = IterOutcome.NONE;
+      return lastOutcome;
     }
     oContext.getStats().startProcessing();
     try {
@@ -168,7 +173,8 @@ public IterOutcome next() {
         if (currentReader == null && !getNextReaderIfHas()) {
           releaseAssets(); // All data has been read. Release resource.
           done = true;
-          return IterOutcome.NONE;
+          lastOutcome = IterOutcome.NONE;
+          return lastOutcome;
         }
         injector.injectChecked(context.getExecutionControls(), "next-allocate", OutOfMemoryException.class);
         currentReader.allocate(mutator.fieldVectorMap());
@@ -191,7 +197,8 @@ public IterOutcome next() {
           // This could happen when data sources have a non-trivial schema with 0 row.
           container.buildSchema(SelectionVectorMode.NONE);
           schema = container.getSchema();
-          return IterOutcome.OK_NEW_SCHEMA;
+          lastOutcome = IterOutcome.OK_NEW_SCHEMA;
+          return lastOutcome;
         }
 
         // Handle case of same schema.
@@ -199,11 +206,13 @@ public IterOutcome next() {
             continue; // Skip to next loop iteration if reader returns 0 row and has same schema.
         } else {
           // return OK if recordCount > 0 && ! isNewSchema
-          return IterOutcome.OK;
+          lastOutcome = IterOutcome.OK;
+          return lastOutcome;
         }
       }
     } catch (OutOfMemoryException ex) {
       clearFieldVectorMap();
+      lastOutcome = IterOutcome.STOP;
       throw UserException.memoryError(ex).build(logger);
     } catch (ExecutionSetupException e) {
       if (currentReader != null) {
@@ -213,12 +222,15 @@ public IterOutcome next() {
           logger.error("Close failed for reader " + currentReaderClassName, e2);
         }
       }
+      lastOutcome = IterOutcome.STOP;
       throw UserException.internalError(e)
           .addContext("Setup failed for", currentReaderClassName)
           .build(logger);
     } catch (UserException ex) {
+      lastOutcome = IterOutcome.STOP;
       throw ex;
     } catch (Exception ex) {
+      lastOutcome = IterOutcome.STOP;
       throw UserException.internalError(ex).build(logger);
     } finally {
       oContext.getStats().stopProcessing();
@@ -559,4 +571,14 @@ private boolean verifyImplcitColumns(int numReaders, List<Map<String, String>> i
 
     return true;
   }
+
+  @Override
+  public boolean hasFailed() {
+    return lastOutcome == IterOutcome.STOP;
+  }
+
+  @Override
+  public void dump() {
+    logger.error("ScanBatch[container={}, currentReader={}, schema={}]", container, currentReader, schema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index ba4b94a8979..22dfdf09074 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -74,6 +74,7 @@
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
 
 /**
  * Operator Batch which implements the TopN functionality. It is more efficient than (sort + limit) since unlike sort
@@ -185,7 +186,7 @@ public IterOutcome innerNext() {
     // Reset the TopN state for next iteration
     resetTopNState();
 
-    try{
+    try {
       boolean incomingHasSv2 = false;
       switch (incoming.getSchema().getSelectionVectorMode()) {
         case NONE: {
@@ -693,4 +694,10 @@ public SelectionVector4 getSelectionVector4() {
       return sv4;
     }
   }
+
+  @Override
+  public void dump() {
+    logger.error("TopNBatch[container={}, config={}, schema={}, sv4={}, countSincePurge={}, " +
+        "batchCount={}, recordCount={}]", container, config, schema, sv4, countSincePurge, batchCount, recordCount);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 65d0c54d1c0..3a8485a24f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -209,4 +209,10 @@ public void close() {
     closeWriter();
     super.close();
   }
+
+  @Override
+  public void dump() {
+    logger.error("WriterRecordBatch[container={}, popConfig={}, counter={}, fragmentUniqueId={}, schema={}]",
+        container, popConfig, counter, fragmentUniqueId, schema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 9de9aae06b3..80d25edb13a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.aggregate;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -500,4 +501,12 @@ protected void killIncoming(boolean sendUpstream) {
     wasKilled = true;
     incoming.kill(sendUpstream);
   }
+
+  @Override
+  public void dump() {
+    logger.error("HashAggBatch[container={}, aggregator={}, groupByOutFieldIds={}, aggrOutFieldIds={}, " +
+            "incomingSchema={}, wasKilled={}, numGroupByExprs={}, numAggrExprs={}, popConfig={}]",
+        container, aggregator, Arrays.toString(groupByOutFieldIds), Arrays.toString(aggrOutFieldIds), incomingSchema,
+        wasKilled, numGroupByExprs, numAggrExprs, popConfig);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index e8ae30e6fe8..f6dd3da425e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -25,6 +25,7 @@
 
 import javax.inject.Named;
 
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.drill.common.exceptions.RetryAfterSpillException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ExpressionPosition;
@@ -1603,6 +1604,14 @@ private void updateStats(HashTable[] htables) {
     }
   }
 
+  @Override
+  public String toString() {
+    // The fields are excluded because they are passed from HashAggBatch
+    String[] excludedFields = new String[] {
+        "baseHashTable", "incoming", "outgoing", "context", "oContext", "allocator", "htables", "newIncoming"};
+    return ReflectionToStringBuilder.toStringExclude(this, excludedFields);
+  }
+
   // Code-generated methods (implemented in HashAggBatch)
   public abstract void doSetup(@Named("incoming") RecordBatch incoming) throws SchemaChangeException;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
index 7ebce2b4ad5..822a810d162 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
@@ -25,6 +25,7 @@
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.SimpleRecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -40,6 +41,9 @@
  * A class to replace "incoming" - instead scanning a spilled partition file
  */
 public class SpilledRecordbatch implements CloseableRecordBatch {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleRecordBatch.class);
+
   private VectorContainer container;
   private InputStream spillStream;
   private int spilledBatches;
@@ -49,6 +53,9 @@
   private String spillFile;
   VectorAccessibleSerializable vas;
   private IterOutcome initialOutcome;
+  // Represents last outcome of next(). If an Exception is thrown
+  // during the method's execution a value IterOutcome.STOP will be assigned.
+  private IterOutcome lastOutcome;
 
   public SpilledRecordbatch(String spillFile, int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) {
     this.context = context;
@@ -66,6 +73,7 @@ public SpilledRecordbatch(String spillFile, int spilledBatches, FragmentContext
     }
 
     initialOutcome = next(); // initialize the container
+    lastOutcome = initialOutcome;
   }
 
   @Override
@@ -126,14 +134,19 @@ public void kill(boolean sendUpstream) {
   @Override
   public IterOutcome next() {
 
-    if ( ! context.getExecutorState().shouldContinue() ) { return IterOutcome.STOP; }
+    if (!context.getExecutorState().shouldContinue()) {
+      lastOutcome = IterOutcome.STOP;
+      return lastOutcome;
+    }
 
     if ( spilledBatches <= 0 ) { // no more batches to read in this partition
       this.close();
-      return IterOutcome.NONE;
+      lastOutcome = IterOutcome.NONE;
+      return lastOutcome;
     }
 
     if ( spillStream == null ) {
+      lastOutcome = IterOutcome.STOP;
       throw new IllegalStateException("Spill stream was null");
     }
 
@@ -152,11 +165,16 @@ public IterOutcome next() {
         container = vas.get();
       }
     } catch (IOException e) {
+      lastOutcome = IterOutcome.STOP;
       throw UserException.dataReadError(e).addContext("Failed reading from a spill file").build(HashAggTemplate.logger);
+    } catch (Exception e) {
+      lastOutcome = IterOutcome.STOP;
+      throw e;
     }
 
     spilledBatches--; // one less batch to read
-    return IterOutcome.OK;
+    lastOutcome = IterOutcome.OK;
+    return lastOutcome;
   }
 
   /**
@@ -164,6 +182,17 @@ public IterOutcome next() {
    */
   public IterOutcome getInitialOutcome() { return initialOutcome; }
 
+  @Override
+  public void dump() {
+    logger.error("SpilledRecordbatch[container={}, spilledBatches={}, schema={}, spillFile={}, spillSet={}]",
+        container, spilledBatches, schema, spillFile, spillSet);
+  }
+
+  @Override
+  public boolean hasFailed() {
+    return lastOutcome == IterOutcome.STOP;
+  }
+
   /**
    * Note: ignoring any IO errors (e.g. file not found)
    */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index c42f9bf5b88..2b9b31783f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -648,4 +648,10 @@ public void close() {
   protected void killIncoming(boolean sendUpstream) {
     incoming.kill(sendUpstream);
   }
+
+  @Override
+  public void dump() {
+    logger.error("StreamingAggBatch[container={}, popConfig={}, aggregator={}, incomingSchema={}]",
+        container, popConfig, aggregator, incomingSchema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index f30616bacd4..4bde7ab1708 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -498,6 +498,16 @@ private void addRecordInc(int index) {
   public void cleanup() {
   }
 
+  @Override
+  public String toString() {
+    return "StreamingAggTemplate[underlyingIndex=" + underlyingIndex
+        + ", previousIndex=" + previousIndex
+        + ", currentIndex=" + currentIndex
+        + ", addedRecordCount=" + addedRecordCount
+        + ", outputCount=" + outputCount
+        + "]";
+  }
+
   public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
   public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2);
   public abstract boolean isSamePrev(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int b2Index);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index b8d4e7644d2..179e6c148e3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -43,6 +43,9 @@
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter> {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
+
   private SelectionVector2 sv2;
   private SelectionVector4 sv4;
   private Filterer filter;
@@ -196,4 +199,9 @@ protected Filterer generateSV2Filterer() throws SchemaChangeException {
       throw new SchemaChangeException("Failure while attempting to load generated class", e);
     }
   }
+
+  @Override
+  public void dump() {
+    logger.error("FilterRecordBatch[container={}, selectionVector2={}, filter={}, popConfig={}]", container, sv2, filter, popConfig);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
index 7b0183bdab8..483a9ede452 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
@@ -119,4 +119,11 @@ public abstract boolean doEval(@Named("inIndex") int inIndex,
                                  @Named("outIndex") int outIndex)
                           throws SchemaChangeException;
 
+  @Override
+  public String toString() {
+    return "FilterTemplate2[outgoingSelectionVector=" + outgoingSelectionVector
+        + ", incomingSelectionVector=" + incomingSelectionVector
+        + ", svMode=" + svMode
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
index e09ed75a0f9..d85d6f78852 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
@@ -64,4 +64,10 @@ private void doTransfers(){
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
   public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
 
+  @Override
+  public String toString() {
+    return "FilterTemplate4[outgoingSelectionVector=" + outgoingSelectionVector
+        + ", incomingSelectionVector=" + incomingSelectionVector
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index 7faaaa5ab0b..bc21580d369 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
@@ -247,4 +247,11 @@ private void computeBitSet(int fieldId, BloomFilter bloomFilter, BitSet bitSet)
       }
     }
   }
-}
\ No newline at end of file
+
+  @Override
+  public void dump() {
+    logger.error("RuntimeFilterRecordBatch[container={}, selectionVector={}, toFilterFields={}, "
+        + "originalRecordCount={}, batchSchema={}]",
+        container, sv2, toFilterFields, originalRecordCount, incoming.getSchema());
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 1623319c398..86ddcd151fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -532,4 +532,10 @@ public void close() {
     updateStats();
     super.close();
   }
+
+  @Override
+  public void dump() {
+    logger.error("FlattenRecordbatch[hasRemainder={}, remainderIndex={}, recordCount={}, flattener={}, container={}]",
+        hasRemainder, remainderIndex, recordCount, flattener, container);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
index e59abacb3b3..fe38244bf1b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
@@ -175,4 +175,15 @@ public abstract void doSetup(@Named("context") FragmentContext context,
                                @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
   public abstract boolean doEval(@Named("inIndex") int inIndex,
                                  @Named("outIndex") int outIndex) throws SchemaChangeException;
+
+  @Override
+  public String toString() {
+    return "FlattenTemplate[svMode=" + svMode
+        + ", fieldToFlatten=" + fieldToFlatten
+        + ", valueIndex=" + valueIndex
+        + ", outputLimit=" + outputLimit
+        + ", innerValueIndex=" + innerValueIndex
+        + ", currentInnerValueIndex=" + currentInnerValueIndex
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 368bb5dc91b..89ab8d4a4c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -1276,4 +1276,10 @@ public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, I
     return hj;
   }
 
+  @Override
+  public void dump() {
+    logger.error("HashJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, hashJoinProbe={}," +
+            " rightExpr={}, canSpill={}, buildSchema={}, probeSchema={}]", container, left, right, leftUpstream, rightUpstream,
+        joinType, hashJoinProbe, rightExpr, canSpill, buildSchema, probeSchema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 639f757eccf..71abeda1139 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -432,4 +432,17 @@ public void changeToFinalProbeState() {
       (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) ? ProbeState.PROJECT_RIGHT :
         ProbeState.DONE; // else we're done
   }
+
+  @Override
+  public String toString() {
+    return "HashJoinProbeTemplate[container=" + container
+        + ", probeSchema=" + probeSchema
+        + ", joinType=" + joinType
+        + ", recordsToProcess=" + recordsToProcess
+        + ", recordsProcessed=" + recordsProcessed
+        + ", outputRecords=" + outputRecords
+        + ", probeState=" + probeState
+        + ", unmatchedBuildIndexes=" + unmatchedBuildIndexes
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 1aaf5e2028f..242687f040d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -1205,4 +1205,12 @@ private void populateExcludedField(PhysicalOperator lateralPop) {
       }
     }
   }
+
+  @Override
+  public void dump() {
+    logger.error("LateralJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, leftSchema={}, " +
+            "rightSchema={}, outputIndex={}, leftJoinIndex={}, rightJoinIndex={}, hasRemainderForLeftJoin={}]",
+        container, left, right, leftUpstream, rightUpstream, leftSchema, rightSchema, outputIndex,
+        leftJoinIndex, rightJoinIndex, hasRemainderForLeftJoin);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 72f776a5505..d502c4f9dea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -547,4 +547,11 @@ private LogicalExpression materializeExpression(LogicalExpression expression, It
     }
     return materializedExpr;
   }
+
+  @Override
+  public void dump() {
+    logger.error("MergeJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, leftIterator={}," +
+            " rightIterator={}, joinStatus={}, joinType={}]",
+        container, left, right, leftUpstream, rightUpstream, joinType, leftIterator, rightIterator, status, joinType);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index e2f93ecf245..6b0c7497638 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -459,4 +459,12 @@ protected void killIncoming(boolean sendUpstream) {
   public int getRecordCount() {
     return outputRecords;
   }
+
+  @Override
+  public void dump() {
+    logger.error("NestedLoopJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, "
+            + "leftSchema={}, rightSchema={}, outputRecords={}, rightContainer={}, rightCounts={}]",
+        container, left, right, leftUpstream, rightUpstream,
+        leftSchema, rightSchema, outputRecords, rightContainer, rightCounts);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 784d9553520..bb491872eb4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -37,7 +37,7 @@
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 
 public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
-  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
 
   private SelectionVector2 outgoingSv;
   private SelectionVector2 incomingSv;
@@ -58,46 +58,46 @@ public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch in
   @Override
   public IterOutcome innerNext() {
     if (!first && !needMoreRecords(numberOfRecords)) {
-        outgoingSv.setRecordCount(0);
-        incoming.kill(true);
+      outgoingSv.setRecordCount(0);
+      incoming.kill(true);
 
-        IterOutcome upStream = next(incoming);
+      IterOutcome upStream = next(incoming);
+      if (upStream == IterOutcome.OUT_OF_MEMORY) {
+        return upStream;
+      }
+
+      while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
+        // Clear the memory for the incoming batch
+        for (VectorWrapper<?> wrapper : incoming) {
+          wrapper.getValueVector().clear();
+        }
+        // clear memory for incoming sv (if any)
+        if (incomingSv != null) {
+          incomingSv.clear();
+        }
+        upStream = next(incoming);
         if (upStream == IterOutcome.OUT_OF_MEMORY) {
           return upStream;
         }
-
-        while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
-          // Clear the memory for the incoming batch
-          for (VectorWrapper<?> wrapper : incoming) {
-            wrapper.getValueVector().clear();
-          }
-          // clear memory for incoming sv (if any)
-          if (incomingSv != null) {
-            incomingSv.clear();
-          }
-          upStream = next(incoming);
-          if (upStream == IterOutcome.OUT_OF_MEMORY) {
-            return upStream;
-          }
+      }
+      // If EMIT that means leaf operator is UNNEST, in this case refresh the limit states and return EMIT.
+      if (upStream == EMIT) {
+        // Clear the memory for the incoming batch
+        for (VectorWrapper<?> wrapper : incoming) {
+          wrapper.getValueVector().clear();
         }
-        // If EMIT that means leaf operator is UNNEST, in this case refresh the limit states and return EMIT.
-        if (upStream == EMIT) {
-          // Clear the memory for the incoming batch
-          for (VectorWrapper<?> wrapper : incoming) {
-            wrapper.getValueVector().clear();
-          }
-
-          // clear memory for incoming sv (if any)
-          if (incomingSv != null) {
-            incomingSv.clear();
-          }
-
-          refreshLimitState();
-          return upStream;
+
+        // clear memory for incoming sv (if any)
+        if (incomingSv != null) {
+          incomingSv.clear();
         }
-        // other leaf operator behave as before.
-        return NONE;
+
+        refreshLimitState();
+        return upStream;
       }
+      // other leaf operator behave as before.
+      return NONE;
+    }
     return super.innerNext();
   }
 
@@ -266,4 +266,10 @@ private void refreshLimitState() {
       Integer.MIN_VALUE : Math.max(0, popConfig.getLast()) - recordStartOffset;
     first = true;
   }
+
+  @Override
+  public void dump() {
+    logger.error("LimitRecordBatch[container={}, offset={}, numberOfRecords={}, incomingSV={}, outgoingSV={}]",
+        container, recordStartOffset, numberOfRecords, incomingSv, outgoingSv);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
index fe1660fd5b1..ed7b265524d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
@@ -40,7 +40,7 @@
  * implicit column for rowId for each row.
  */
 public class PartitionLimitRecordBatch extends AbstractSingleRecordBatch<PartitionLimit> {
-  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
 
   private SelectionVector2 outgoingSv;
   private SelectionVector2 incomingSv;
@@ -250,4 +250,12 @@ private void refreshConfigParameter() {
     numberOfRecords = (popConfig.getLast() == null) ?
       Integer.MIN_VALUE : Math.max(0, popConfig.getLast()) - recordStartOffset;
   }
+
+  @Override
+  public void dump() {
+    logger.error("PartitionLimitRecordBatch[container={}, popConfig={}, incomingSV={}, outgoingSV={},"
+            + " recordStartOffset={}, numberOfRecords={}, partitionId={}, unionTypeEnabled={}, state={}]",
+        container, popConfig, incomingSv, outgoingSv, recordStartOffset, numberOfRecords,
+        partitionId, unionTypeEnabled, state);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 791b24aab0e..12ee668198a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.mergereceiver;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
@@ -827,4 +828,11 @@ public void close() {
     super.close();
   }
 
+  @Override
+  public void dump() {
+    logger.error("MergingRecordBatch[container={}, outgoingPosition={}, incomingBatches={}, batchOffsets={}, "
+            + "tempBatchHolder={}, inputCounts={}, outputCounts={}]",
+        container, outgoingPosition, Arrays.toString(incomingBatches), Arrays.toString(batchOffsets),
+        Arrays.toString(tempBatchHolder), Arrays.toString(inputCounts), Arrays.toString(outputCounts));
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index a3aa11b7bba..63a01212249 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -646,4 +646,10 @@ protected void setupNewSchema(VectorAccessible batch) throws SchemaChangeExcepti
     }
   }
 
+  @Override
+  public void dump() {
+    logger.error("OrderedPartitionRecordBatch[container={}, popConfig={}, partitionVectors={}, partitions={}, " +
+            "recordsSampled={}, recordCount={}]",
+        container, popConfig, partitionVectors, partitions, recordsSampled, recordCount);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index bbcb758319a..938540014f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -247,4 +247,9 @@ public static RecordBatchDataWrapper outOfMemory() {
     }
   }
 
+  @Override
+  public void dump() {
+    logger.error("ProducerConsumerBatch[container={}, recordCount={}, schema={}, stop={}]",
+        container, recordCount, schema, stop);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 4d55f00340f..8ea15d3cb89 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -898,4 +898,10 @@ protected IterOutcome handleNullInput() {
     wasNone = true;
     return IterOutcome.OK_NEW_SCHEMA;
   }
+
+  @Override
+  public void dump() {
+    logger.error("ProjectRecordBatch[projector={}, hasRemainder={}, remainderIndex={}, recordCount={}, container={}]",
+        projector, hasRemainder, remainderIndex, recordCount, container);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index 02ccd4b0d28..2f1aa0242cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -110,4 +110,10 @@ public abstract void doEval(@Named("inIndex") int inIndex,
                               @Named("outIndex") int outIndex)
                        throws SchemaChangeException;
 
+  @Override
+  public String toString() {
+    return "Projector[vector2=" + vector2
+        + ", selectionVectorMode=" + svMode
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
index 620f150eaa8..e0beab14409 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
@@ -55,6 +55,7 @@
 
   private final OperatorDriver driver;
   private final BatchAccessor batchAccessor;
+  private IterOutcome lastOutcome;
 
   public OperatorRecordBatch(FragmentContext context, PhysicalOperator config, OperatorExec opExec) {
     OperatorContext opContext = context.newOperatorContext(config);
@@ -143,7 +144,12 @@ public void kill(boolean sendUpstream) {
   public IterOutcome next() {
     try {
       driver.operatorContext().getStats().startProcessing();
-      return driver.next();
+      lastOutcome = driver.next();
+      return lastOutcome;
+    } catch (Exception e) {
+      // mark batch as failed
+      lastOutcome = IterOutcome.STOP;
+      throw e;
     } finally {
       driver.operatorContext().getStats().stopProcessing();
     }
@@ -158,4 +164,14 @@ public void close() {
   public VectorContainer getContainer() {
     return batchAccessor.getOutgoingContainer();
   }
+
+  @Override
+  public boolean hasFailed() {
+    return lastOutcome == IterOutcome.STOP;
+  }
+
+  @Override
+  public void dump() {
+    logger.error("OperatorRecordBatch[batchAccessor={}, lastOutcome={}]", batchAccessor, lastOutcome);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index fc49d438325..fcbb10efde5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -213,4 +213,8 @@ protected void killIncoming(boolean sendUpstream) {
     incoming.kill(sendUpstream);
   }
 
+  @Override
+  public void dump() {
+    logger.error("SortBatch[popConfig={}, container={}, sorter={}, schema={}]", popConfig, container, sorter, schema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
index da476cc3787..b3c6a7f69ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
@@ -70,4 +70,8 @@ public int compare(int leftIndex, int rightIndex) {
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch outgoing);
   public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex);
 
+  @Override
+  public String toString() {
+    return "SortTemplate[vector4=" + vector4 + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 1471d5e558f..a8c362200f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -92,4 +92,9 @@ public void close() {
   public WritableBatch getWritableBatch() {
     return WritableBatch.get(this);
   }
+
+  @Override
+  public void dump() {
+    logger.error("RemovingRecordBatch[container={}, state={}, copier={}]", container, state, copier);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 50cb26b09bf..d56f848d547 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -39,7 +39,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-/* TraceRecordBatch contains value vectors which are exactly the same
+/** TraceRecordBatch contains value vectors which are exactly the same
  * as the incoming record batch's value vectors. If the incoming
  * record batch has a selection vector (type 2) then TraceRecordBatch
  * will also contain a selection vector.
@@ -171,4 +171,8 @@ public void close() {
     super.close();
   }
 
+  @Override
+  public void dump() {
+    logger.error("TraceRecordBatch[filename={}, logLocation={}, selectionVector={}]", getFileName(), logLocation, sv);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 7e16d6a35c6..e83fddfb493 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -440,4 +440,9 @@ public void close() {
       batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
   }
 
+  @Override
+  public void dump() {
+    logger.error("UnionAllRecordBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, "
+            + "recordCount={}]", container, left, right, leftUpstream, rightUpstream, recordCount);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
index 285481f3a93..508999f333b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
@@ -191,4 +191,14 @@ public void close() {
       transfers = null;
     }
   }
+
+  @Override
+  public String toString() {
+    return "UnnestImpl[svMode=" + svMode
+        + ", outputLimit=" + outputLimit
+        + ", valueIndex=" + valueIndex
+        + ", innerValueIndex=" + innerValueIndex
+        + ", runningInnerValueIndex=" + runningInnerValueIndex
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 5f6396731a9..1c8336d2362 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -241,7 +241,7 @@ public IterOutcome innerNext() {
     }
   }
 
-    @Override
+  @Override
   public VectorContainer getOutgoingContainer() {
     return this.container;
   }
@@ -446,4 +446,9 @@ public void close() {
     super.close();
   }
 
+  @Override
+  public void dump() {
+    logger.error("UnnestRecordBatch[container={}, unnest={}, hasRemainder={}, remainderIndex={}, " +
+            "unnestFieldMetadata={}]", container, unnest, hasRemainder, remainderIndex, unnestFieldMetadata);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 8cdc0a1c78a..a6ebef0e621 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -64,6 +64,9 @@
   private boolean first = true;
   private final UnorderedReceiver config;
   private final OperatorContext oContext;
+  // Represents last outcome of next(). If an Exception is thrown
+  // during the method's execution a value IterOutcome.STOP will be assigned.
+  private IterOutcome lastOutcome;
 
   public enum Metric implements MetricDef {
     BYTES_RECEIVED,
@@ -156,7 +159,7 @@ private RawFragmentBatch getNextBatch() throws IOException {
   public IterOutcome next() {
     batchLoader.resetRecordCount();
     stats.startProcessing();
-    try{
+    try {
       RawFragmentBatch batch;
       try {
         stats.startWait();
@@ -174,15 +177,17 @@ public IterOutcome next() {
       first = false;
 
       if (batch == null) {
+        lastOutcome = IterOutcome.NONE;
         batchLoader.zero();
         if (!context.getExecutorState().shouldContinue()) {
-          return IterOutcome.STOP;
+          lastOutcome = IterOutcome.STOP;
         }
-        return IterOutcome.NONE;
+        return lastOutcome;
       }
 
       if (context.getAllocator().isOverLimit()) {
-        return IterOutcome.OUT_OF_MEMORY;
+        lastOutcome = IterOutcome.OUT_OF_MEMORY;
+        return lastOutcome;
       }
 
       final RecordBatchDef rbd = batch.getHeader().getDef();
@@ -195,14 +200,19 @@ public IterOutcome next() {
       if(schemaChanged) {
         this.schema = batchLoader.getSchema();
         stats.batchReceived(0, rbd.getRecordCount(), true);
-        return IterOutcome.OK_NEW_SCHEMA;
+        lastOutcome = IterOutcome.OK_NEW_SCHEMA;
       } else {
         stats.batchReceived(0, rbd.getRecordCount(), false);
-        return IterOutcome.OK;
+        lastOutcome = IterOutcome.OK;
       }
-    } catch(SchemaChangeException | IOException ex) {
+      return lastOutcome;
+    } catch (SchemaChangeException | IOException ex) {
       context.getExecutorState().fail(ex);
-      return IterOutcome.STOP;
+      lastOutcome = IterOutcome.STOP;
+      return lastOutcome;
+    } catch (Exception e) {
+      lastOutcome = IterOutcome.STOP;
+      throw e;
     } finally {
       stats.stopProcessing();
     }
@@ -270,4 +280,14 @@ public void interrupted(final InterruptedException e) {
       }
     }
   }
+
+  @Override
+  public void dump() {
+    logger.error("UnorderedReceiverBatch[batchLoader={}, schema={}]", batchLoader, schema);
+  }
+
+  @Override
+  public boolean hasFailed() {
+    return lastOutcome == IterOutcome.STOP;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 88f4c7dfd26..1ea3895735b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -322,8 +322,7 @@ public IterOutcome next() {
       }
 
       return batchState;
-    }
-    catch (RuntimeException | Error e) {
+    } catch (RuntimeException | Error e) {
       exceptionState = e;
       logger.trace("[#{}, on {}]: incoming next() exception: ({} ->) {}",
                    instNum, batchTypeName, prevBatchState, exceptionState,
@@ -366,4 +365,14 @@ public VectorContainer getContainer() {
 
   public RecordBatch getIncoming() { return incoming; }
 
+  @Override
+  public boolean hasFailed() {
+    return exceptionState != null || batchState == STOP;
+  }
+
+  @Override
+  public void dump() {
+    logger.error("IteratorValidatorBatchIterator[container={}, instNum={}, batchTypeName={}, lastSchema={}, "
+           + "lastNewSchema={}]", getContainer(), instNum, batchTypeName, lastSchema, lastNewSchema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
index 5cd6de775d6..1e477ec1fb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
@@ -295,6 +295,17 @@ public void cleanup() {
     internal.clear();
   }
 
+  @Override
+  public String toString() {
+    return "FrameSupportTemplate[internal=" + internal
+        + ", outputCount=" + outputCount
+        + ", current=" + current
+        + ", frameLastRow=" + frameLastRow
+        + ", remainingRows=" + remainingRows
+        + ", partialPartition=" + partialPartition
+        + "]";
+  }
+
   /**
    * called once for each peer row of the current frame.
    * @param index of row to aggregate
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
index 55c27c1d48a..cc7a04d1568 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
@@ -298,6 +298,17 @@ public void cleanup() {
     internal.clear();
   }
 
+  @Override
+  public String toString() {
+    return "FrameSupportTemplate[internal=" + internal
+        + ", outputCount=" + outputCount
+        + ", current=" + current
+        + ", requireFullPartition=" + requireFullPartition
+        + ", partition=" + partition
+        + "]";
+  }
+
+
   /**
    * called once for each row after we evaluate all peer rows. Used to write a value in the row
    *
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
index 7d98724ab0f..a9baea9596c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
@@ -106,4 +106,9 @@ public SelectionVector4 getSelectionVector4() {
   public void clear() {
     container.clear();
   }
+
+  @Override
+  public String toString() {
+    return "WindowDataBatch[container=" + container + ", recordCount=" + recordCount + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index a372a3cf9c7..59e84ef5c8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.window;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
@@ -426,4 +427,10 @@ protected void killIncoming(boolean sendUpstream) {
   public int getRecordCount() {
     return framers[0].getOutputCount();
   }
+
+  @Override
+  public void dump() {
+    logger.error("WindowFrameRecordBatch[container={}, popConfig={}, framers={}, schema={}]",
+        container, popConfig, Arrays.toString(framers), schema);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 0edf974e16b..262a241b035 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -824,4 +824,8 @@ protected void killIncoming(boolean sendUpstream) {
     incoming.kill(sendUpstream);
   }
 
+  @Override
+  public void dump() {
+    logger.error("ExternalSortBatch[schema={}, sorter={}, mSorter={}, container={}]", schema, sorter, mSorter, container);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 5ebec50f504..9b10d430ba3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -206,4 +206,13 @@ public abstract void doSetup(@Named("context") FragmentContext context,
   public abstract int doEval(@Named("leftIndex") int leftIndex,
                              @Named("rightIndex") int rightIndex)
                       throws SchemaChangeException;
+
+  @Override
+  public String toString() {
+    return "MSortTemplate[vector4=" + vector4
+        + ", aux=" + aux
+        + ", runStarts=" + runStarts
+        + ", desiredRecordBatchCount=" + desiredRecordBatchCount
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
index de783dfa921..57d2ec3512e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
@@ -83,4 +83,9 @@ public abstract void doSetup(@Named("context") FragmentContext context,
   public abstract int doEval(@Named("leftIndex") char leftIndex,
                              @Named("rightIndex") char rightIndex)
                       throws SchemaChangeException;
+
+  @Override
+  public String toString() {
+    return "SinglebatchSorterTemplate[vector2=" + vector2 + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 7db4d3bb120..8fc4a749fe6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -698,4 +698,11 @@ private SortImpl createNewSortImpl() {
     SpilledRuns spilledRuns = new SpilledRuns(oContext, spillSet, copierHolder);
     return new SortImpl(oContext, sortConfig, spilledRuns, outputWrapperContainer);
   }
+
+  @Override
+  public void dump() {
+    logger.error("ExternalSortBatch[schema={}, sortState={}, sortConfig={}, outputWrapperContainer={}, "
+            + "outputSV4={}, container={}]",
+        schema, sortState, sortConfig, outputWrapperContainer, outputSV4, container);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
index e592ccb6ab6..fbcf1ae2915 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
@@ -137,6 +137,15 @@ private void logConfig() {
                   mergeBatchSize, mSortBatchSize);
   }
 
+  @Override
+  public String toString() {
+    return "SortConfig[spillFileSize=" + spillFileSize
+        + ", spillBatchSize=" + spillBatchSize
+        + ", mergeBatchSize=" + mergeBatchSize
+        + ", mSortBatchSize=" + mSortBatchSize
+        + "]";
+  }
+
   public long maxMemory() { return maxMemory; }
   public int mergeLimit() { return mergeLimit; }
   public long spillFileSize() { return spillFileSize; }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
index ac30e947152..79294cc5ea5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
@@ -592,4 +592,12 @@ public void close() {
       throw ex;
     }
   }
+
+  @Override
+  public String toString() {
+    return "SortImpl[config=" + config
+        + ", outputBatch=" + outputBatch
+        + ", sizer=" + sizer
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index c38de2d2b35..362ea29924b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -47,6 +47,10 @@
   protected final boolean unionTypeEnabled;
   protected BatchState state;
 
+  // Represents last outcome of next(). If an Exception is thrown
+  // during the method's execution a value IterOutcome.STOP will be assigned.
+  private IterOutcome lastOutcome;
+
   protected AbstractRecordBatch(final T popConfig, final FragmentContext context) throws OutOfMemoryException {
     this(popConfig, context, true, context.newOperatorContext(popConfig));
   }
@@ -113,7 +117,7 @@ public final IterOutcome next(final RecordBatch b) {
   }
 
   public final IterOutcome next(final int inputIndex, final RecordBatch b){
-    IterOutcome next = null;
+    IterOutcome next;
     stats.stopProcessing();
     try{
       if (!context.getExecutorState().shouldContinue()) {
@@ -132,15 +136,15 @@ public final IterOutcome next(final int inputIndex, final RecordBatch b){
     }
 
     switch(next) {
-    case OK_NEW_SCHEMA:
-      stats.batchReceived(inputIndex, b.getRecordCount(), true);
-      break;
-    case OK:
-    case EMIT:
-      stats.batchReceived(inputIndex, b.getRecordCount(), false);
-      break;
-    default:
-      break;
+      case OK_NEW_SCHEMA:
+        stats.batchReceived(inputIndex, b.getRecordCount(), true);
+        break;
+      case OK:
+      case EMIT:
+        stats.batchReceived(inputIndex, b.getRecordCount(), false);
+        break;
+      default:
+        break;
     }
 
     return next;
@@ -155,27 +159,38 @@ public final IterOutcome next() {
           buildSchema();
           switch (state) {
             case DONE:
-              return IterOutcome.NONE;
+              lastOutcome = IterOutcome.NONE;
+              break;
             case OUT_OF_MEMORY:
               // because we don't support schema changes, it is safe to fail the query right away
               context.getExecutorState().fail(UserException.memoryError()
                 .build(logger));
               // FALL-THROUGH
             case STOP:
-              return IterOutcome.STOP;
+              lastOutcome = IterOutcome.STOP;
+              break;
             default:
               state = BatchState.FIRST;
-              return IterOutcome.OK_NEW_SCHEMA;
+              lastOutcome = IterOutcome.OK_NEW_SCHEMA;
+              break;
           }
+          break;
         }
         case DONE: {
-          return IterOutcome.NONE;
+          lastOutcome = IterOutcome.NONE;
+          break;
         }
         default:
-          return innerNext();
+          lastOutcome = innerNext();
+          break;
       }
+      return lastOutcome;
     } catch (final SchemaChangeException e) {
+      lastOutcome = IterOutcome.STOP;
       throw new DrillRuntimeException(e);
+    } catch (Exception e) {
+      lastOutcome = IterOutcome.STOP;
+      throw e;
     } finally {
       stats.stopProcessing();
     }
@@ -245,6 +260,11 @@ public VectorContainer getContainer() {
     return  container;
   }
 
+  @Override
+  public boolean hasFailed() {
+    return lastOutcome == IterOutcome.STOP;
+  }
+
   public RecordBatchStatsContext getRecordBatchStatsContext() {
     return batchStatsContext;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
index cb3a61c4a7e..dda4ef52f96 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
@@ -59,6 +59,5 @@ public void setIncoming(LateralContract incoming) {
     setIncoming(incoming.getIncoming());
     lateral = incoming;
   }
-
 }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index c65827c2be6..7473c8caf2b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -320,4 +320,19 @@ public boolean isError() {
    * buffers.
    */
   WritableBatch getWritableBatch();
+
+  /**
+   * Perform dump of this batch's state to logs.
+   */
+  void dump();
+
+  /**
+   * Use this method to see if the batch has failed. Currently used when logging {@code RecordBatch}'s
+   * state using {@link #dump()} method.
+   *
+   * @return {@code true} if either {@link org.apache.drill.exec.record.RecordBatch.IterOutcome#STOP}
+   * was returned by its or child's {@link #next()} invocation or there was an {@code Exception} thrown
+   * during execution of the batch; {@code false} otherwise
+   */
+  boolean hasFailed();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index e841f2f3bc0..696d6db27f4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -292,4 +292,12 @@ public void clear() {
     container.clear();
     resetRecordCount();
   }
+
+  @Override
+  public String toString() {
+    return "RecordBatchLoader[container=" + container
+        + ", valueCount=" + valueCount
+        + ", schema=" + schema
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
index 47b11a68538..04cf32e15eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
@@ -367,4 +367,18 @@ public void close() {
     clear();
     clearInflightBatches();
   }
+
+  @Override
+  public String toString() {
+    return "RecordIterator[outerPosition=" + outerPosition
+        + ", innerPosition=" + innerPosition
+        + ", innerRecordCount=" + innerRecordCount
+        + ", totalRecordCount=" + totalRecordCount
+        + ", startBatchPosition=" + startBatchPosition
+        + ", markedInnerPosition" + markedInnerPosition
+        + ", markedOuterPosition=" + markedOuterPosition
+        + ", lastOutcome=" + lastOutcome
+        + ", inputIndex=" + inputIndex
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
index 9dfa129c75f..e4278ba8861 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
@@ -107,4 +107,14 @@ public void close() throws Exception {
 
   @Override
   public VectorContainer getContainer() { return null; }
+
+  @Override
+  public boolean hasFailed() {
+    return false;
+  }
+
+  @Override
+  public void dump() {
+    logger.error("SchemalessBatch[]");
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
index 4063e555636..c588f25f812 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
@@ -28,6 +28,9 @@
  * Wrap a VectorContainer into a record batch.
  */
 public class SimpleRecordBatch implements RecordBatch {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleRecordBatch.class);
+
   private VectorContainer container;
   private FragmentContext context;
 
@@ -99,4 +102,14 @@ public VectorContainer getOutgoingContainer() {
   public VectorContainer getContainer() {
      return container;
   }
+
+  @Override
+  public void dump() {
+    logger.error("SimpleRecordBatch[container=" + container + "]");
+  }
+
+  @Override
+  public boolean hasFailed() {
+    return false;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index a0b47ed89db..4f4f88d4907 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -161,4 +161,13 @@ public static int getRecordIndex(int sv4Index) {
   public void close() {
     clear();
   }
+
+  @Override
+  public String toString() {
+    return "SelectionVector4[data=" + data
+        + ", recordCount=" + recordCount
+        + ", start=" + start
+        + ", length=" + length
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
index 7accdc4f26a..9314da67893 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
@@ -104,5 +104,4 @@ public void allocate(Map<String, ValueVector> vectorMap) throws OutOfMemoryExcep
   protected List<SchemaPath> getDefaultColumnsToRead() {
     return GroupScan.ALL_COLUMNS;
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
index a0dda01d40d..edd91d157be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
@@ -26,8 +26,8 @@
 import org.apache.drill.exec.vector.ValueVector;
 
 public interface RecordReader extends AutoCloseable {
-  public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
-  public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
+  long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024;
+  long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000;
 
   /**
    * Configure the RecordReader with the provided schema and the record batch that should be written to.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
index 68c3c369da9..31c01030852 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
@@ -222,4 +222,9 @@ private void applyStrategy(FileSystem fs, Path path, FsPermission permission, bo
       fs.deleteOnExit(path);
     }
   }
-}
\ No newline at end of file
+
+  @Override
+  public String toString() {
+    return "StorageStrategy[umask=" + umask + ", deleteOnExist=" + deleteOnExit + "]";
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
index 6945fff3674..7668130068e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -66,7 +66,7 @@
 /**
  * A RecordReader implementation for Avro data files.
  *
- * @see RecordReader
+ * @see org.apache.drill.exec.store.RecordReader
  */
 public class AvroRecordReader extends AbstractRecordReader {
 
@@ -398,4 +398,17 @@ public void close() {
       }
     }
   }
+
+  @Override
+  public String toString() {
+    long currentPosition = -1L;
+    try {
+      currentPosition = reader.tell();
+    } catch (IOException e) {
+      logger.trace("Unable to obtain reader position: " + e.getMessage());
+    }
+    return "AvroRecordReader[File=" + hadoop
+        + ", Position=" + currentPosition
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
index 5d9e1053f35..2580010e20f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
@@ -23,7 +23,6 @@
 import java.util.List;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.expr.holders.BigIntHolder;
@@ -54,6 +53,8 @@
   private final boolean readNumbersAsDouble;
   protected DrillBuf workBuf;
   private String currentFieldName;
+  // Used for error context
+  private BsonReader reader;
 
   public BsonRecordReader(DrillBuf managedBuf, boolean allTextMode, boolean readNumbersAsDouble) {
     this(managedBuf, GroupScan.ALL_COLUMNS, readNumbersAsDouble);
@@ -67,6 +68,7 @@ public BsonRecordReader(DrillBuf managedBuf, List<SchemaPath> columns, boolean r
   }
 
   public void write(ComplexWriter writer, BsonReader reader) throws IOException {
+    this.reader = reader;
     reader.readStartDocument();
     BsonType readBsonType = reader.getCurrentBsonType();
     switch (readBsonType) {
@@ -364,17 +366,20 @@ public void ensureAtLeastOneField(ComplexWriter writer) {
     }
   }
 
-  public UserException.Builder getExceptionWithContext(UserException.Builder exceptionBuilder, String field,
-      String msg, Object... args) {
-    return null;
-  }
-
-  public UserException.Builder getExceptionWithContext(Throwable exception, String field, String msg, Object... args) {
-    return null;
-  }
-
   private void ensure(final int length) {
     workBuf = workBuf.reallocIfNeeded(length);
   }
 
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("BsonRecordReader[");
+    if (reader != null) {
+      sb.append("Name=")
+          .append(reader.getCurrentName())
+          .append(", Type=")
+          .append(reader.getCurrentBsonType());
+    }
+    sb.append(']');
+    return sb.toString();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
index 7a4e4a2a618..379e2c93e2f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
@@ -103,4 +103,12 @@ protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
   public int getOperatorType() {
     return formatPlugin.getWriterOperatorType();
   }
+
+  @Override
+  public String toString() {
+    return "EasyWriter[location=" + location
+        + ", storageStrategy=" + getStorageStrategy()
+        + ", partitionColumns=" + partitionColumns
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 4b8bbf89064..62ace663bdd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -104,7 +104,7 @@ private JSONRecordReader(final FragmentContext fragmentContext, final String inp
         "One of inputPath or embeddedContent must be set but not both."
         );
 
-    if(inputPath != null) {
+    if (inputPath != null) {
       this.hadoopPath = new Path(inputPath);
     } else {
       this.embeddedContent = embeddedContent;
@@ -126,9 +126,11 @@ private JSONRecordReader(final FragmentContext fragmentContext, final String inp
   public String toString() {
     return super.toString()
         + "[hadoopPath = " + hadoopPath
+        + ", currentRecord=" + currentRecordNumberInFile()
+        + ", jsonReader=" + jsonReader
         + ", recordCount = " + recordCount
         + ", parseErrorCount = " + parseErrorCount
-         + ", runningRecordCount = " + runningRecordCount + ", ...]";
+        + ", runningRecordCount = " + runningRecordCount + ", ...]";
   }
 
   @Override
@@ -162,9 +164,9 @@ public void setup(final OperatorContext context, final OutputMutator output) thr
   }
 
   private void setupParser() throws IOException {
-    if(hadoopPath != null){
+    if (hadoopPath != null) {
       jsonReader.setSource(stream);
-    }else{
+    } else {
       jsonReader.setSource(embeddedContent);
     }
     jsonReader.setIgnoreJSONParseErrors(skipMalformedJSONRecords);
@@ -182,7 +184,7 @@ protected void handleAndRaise(String suffix, Exception e) throws UserException {
     }
 
     UserException.Builder exceptionBuilder = UserException.dataReadError(e)
-            .message("%s - %s", suffix, message);
+        .message("%s - %s", suffix, message);
     if (columnNr > 0) {
       exceptionBuilder.pushContext("Column ", columnNr);
     }
@@ -205,36 +207,32 @@ public int next() {
     writer.reset();
     recordCount = 0;
     parseErrorCount = 0;
-    if(write == ReadState.JSON_RECORD_PARSE_EOF_ERROR){
+    if (write == ReadState.JSON_RECORD_PARSE_EOF_ERROR) {
       return recordCount;
     }
-    outside: while(recordCount < DEFAULT_ROWS_PER_BATCH){
-      try{
+    while (recordCount < DEFAULT_ROWS_PER_BATCH) {
+      try {
         writer.setPosition(recordCount);
         write = jsonReader.write(writer);
-        if(write == ReadState.WRITE_SUCCEED){
+        if (write == ReadState.WRITE_SUCCEED) {
           recordCount++;
-        }
-        else if(write == ReadState.JSON_RECORD_PARSE_ERROR || write == ReadState.JSON_RECORD_PARSE_EOF_ERROR){
-          if(skipMalformedJSONRecords == false){
-            handleAndRaise("Error parsing JSON", new Exception(hadoopPath.getName() + " : line nos :" + (recordCount+1)));
+        } else if (write == ReadState.JSON_RECORD_PARSE_ERROR || write == ReadState.JSON_RECORD_PARSE_EOF_ERROR) {
+          if (!skipMalformedJSONRecords) {
+            handleAndRaise("Error parsing JSON", new Exception());
           }
           ++parseErrorCount;
-          if(printSkippedMalformedJSONRecordLineNumber){
-            logger.debug("Error parsing JSON in " + hadoopPath.getName() + " : line nos :" + (recordCount+parseErrorCount));
+          if (printSkippedMalformedJSONRecordLineNumber) {
+            logger.debug("Error parsing JSON in " + hadoopPath.getName() + " : line nos :" + (recordCount + parseErrorCount));
           }
-          if(write == ReadState.JSON_RECORD_PARSE_EOF_ERROR){
-            break outside;
+          if (write == ReadState.JSON_RECORD_PARSE_EOF_ERROR) {
+            break;
           }
+        } else {
+          break;
         }
-        else{
-          break outside;
-        }
+      } catch (IOException ex) {
+        handleAndRaise("Error parsing JSON", ex);
       }
-      catch(IOException ex)
-        {
-           handleAndRaise("Error parsing JSON", ex);
-        }
     }
     // Skip empty json file with 0 row.
     // Only when data source has > 0 row, ensure the batch has one field.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
index fba80e50782..adab033f45f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
@@ -27,7 +27,7 @@
 
 public interface JsonProcessor {
 
-  public static enum ReadState {
+  enum ReadState {
     END_OF_STREAM,
     JSON_RECORD_PARSE_ERROR,
     JSON_RECORD_PARSE_EOF_ERROR,
@@ -41,17 +41,11 @@
 
   void ensureAtLeastOneField(BaseWriter.ComplexWriter writer);
 
-  public UserException.Builder getExceptionWithContext(UserException.Builder exceptionBuilder,
-                                                       String field,
-                                                       String msg,
-                                                       Object... args);
+  UserException.Builder getExceptionWithContext(UserException.Builder exceptionBuilder, String message);
 
-  public UserException.Builder getExceptionWithContext(Throwable exception,
-                                                       String field,
-                                                       String msg,
-                                                       Object... args);
+  UserException.Builder getExceptionWithContext(Throwable exception, String message);
 
-  public boolean ignoreJSONParseError();
+  boolean ignoreJSONParseError();
 
-  public void setIgnoreJSONParseErrors(boolean ignoreJSONParseErrors);
+  void setIgnoreJSONParseErrors(boolean ignoreJSONParseErrors);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
index aaa74ae6684..48a1464a935 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
@@ -24,6 +24,7 @@
 
 import org.apache.drill.exec.store.easy.json.JsonProcessor;
 
+import com.fasterxml.jackson.core.JsonLocation;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -49,8 +50,12 @@
 
   protected JsonParser parser;
   protected DrillBuf workBuf;
-  protected JsonToken lastSeenJsonToken = null;
-  boolean ignoreJSONParseErrors = false; // default False
+  protected JsonToken lastSeenJsonToken;
+  boolean ignoreJSONParseErrors;
+  /**
+   * The name of the current field being parsed. For Error messages.
+   */
+  protected String currentFieldName;
 
   /**
    *
@@ -90,26 +95,31 @@ public void setSource(JsonNode node) {
   }
 
   @Override
-  public UserException.Builder getExceptionWithContext(
-      UserException.Builder exceptionBuilder, String field, String msg,
-      Object... args) {
-    if (msg != null) {
-      exceptionBuilder.message(msg, args);
-    }
-    if (field != null) {
-      exceptionBuilder.pushContext("Field ", field);
+  public String toString() {
+    JsonLocation location = parser.getCurrentLocation();
+    return getClass().getSimpleName() + "[Line=" + location.getLineNr()
+        + ", Column=" + (location.getColumnNr() + 1)
+        + ", Field=" + getCurrentField()
+        + "]";
+  }
+
+  @Override
+  public UserException.Builder getExceptionWithContext(UserException.Builder builder, String message) {
+    builder.message(message);
+    JsonLocation location = parser.getCurrentLocation();
+    builder.addContext("Line", location.getLineNr())
+        .addContext("Column", location.getColumnNr() + 1);
+    String fieldName = getCurrentField();
+    if (fieldName != null) {
+      builder.addContext("Field", fieldName);
     }
-    exceptionBuilder.pushContext("Column ",
-        parser.getCurrentLocation().getColumnNr() + 1).pushContext("Line ",
-        parser.getCurrentLocation().getLineNr());
-    return exceptionBuilder;
+    return builder;
   }
 
   @Override
-  public UserException.Builder getExceptionWithContext(Throwable e,
-      String field, String msg, Object... args) {
+  public UserException.Builder getExceptionWithContext(Throwable e, String message) {
     UserException.Builder exceptionBuilder = UserException.dataReadError(e);
-    return getExceptionWithContext(exceptionBuilder, field, msg, args);
+    return getExceptionWithContext(exceptionBuilder, message);
   }
 
   /*
@@ -138,4 +148,8 @@ protected JsonExceptionProcessingState processJSONException()
     }
     return JsonExceptionProcessingState.PROC_SUCCEED;
   }
+
+  protected String getCurrentField() {
+    return currentFieldName;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
index a5e9f1afdaa..0f92ec52ade 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
@@ -23,8 +23,6 @@
 
 import io.netty.buffer.DrillBuf;
 
-import org.apache.drill.exec.store.easy.json.JsonProcessor.ReadState;
-import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor.JsonExceptionProcessingState;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter;
 
 public class CountingJsonReader extends BaseJsonProcessor {
@@ -41,18 +39,14 @@ public ReadState write(BaseWriter.ComplexWriter writer) throws IOException {
         token = parser.nextToken();
       }
       lastSeenJsonToken = null;
+      if (token == JsonToken.FIELD_NAME) {
+        currentFieldName = parser.getText();
+      }
       if (!parser.hasCurrentToken()) {
         return ReadState.END_OF_STREAM;
       } else if (token != JsonToken.START_OBJECT) {
         throw new com.fasterxml.jackson.core.JsonParseException(
-            parser,
-            String
-                .format(
-                    "Cannot read from the middle of a record. Current token was %s ",
-                    token));
-        // throw new
-        // IllegalStateException(String.format("Cannot read from the middle of a record. Current token was %s",
-        // token));
+            parser, String.format("Cannot read from the middle of a record. Current token was %s ", token));
       }
       writer.rootAsMap().bit("count").writeBit(1);
       parser.skipChildren();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
index 3d88b1ab29c..549df82773a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
@@ -163,4 +163,19 @@ public void close() {
       logger.warn("Exception closing reader: {}", e);
     }
   }
-}
\ No newline at end of file
+
+  @Override
+  public String toString() {
+    long position = -1L;
+    try {
+      if (reader != null) {
+        position = reader.getPos();
+      }
+    } catch (IOException e) {
+      logger.trace("Unable to obtain reader position: " + e.getMessage());
+    }
+    return "SequenceFileRecordReader[File=" + split.getPath()
+        + ", Position=" + position
+        + "]";
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
index 9a1d486af05..7aa9b0484d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
@@ -216,7 +216,7 @@ public int next() {
     } catch (IOException | TextParsingException e) {
       throw UserException.dataReadError(e)
           .addContext("Failure while reading file %s. Happened at or shortly before byte position %d.",
-            split.getPath(), reader.getPos())
+              split.getPath(), reader.getPos())
           .build(logger);
     }
   }
@@ -248,4 +248,11 @@ public void close() {
       logger.warn("Exception while closing stream.", e);
     }
   }
+
+  @Override
+  public String toString() {
+    return "CompliantTextRecordReader[File=" + split.getPath()
+        + ", reader=" + reader
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
index a181d427064..7a9ed4655ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
@@ -500,4 +500,12 @@ public void close() throws IOException{
     input.close();
   }
 
+  @Override
+  public String toString() {
+    return "TextReader[Line=" + context.currentLine()
+        + ", Column=" + context.currentChar()
+        + ", Record=" + context.currentRecord()
+        + ", Byte pos=" + getPos()
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
index 18437df94ff..f43bb881302 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
@@ -242,6 +242,14 @@ public void close() throws Exception {
       }
     }
 
+    @Override
+    public String toString() {
+      return "HttpdLogRecordReader[Path=" + work.getPath()
+          + ", Start=" + work.getStart()
+          + ", Length=" + work.getLength()
+          + ", Line=" + lineNumber.get()
+          + "]";
+    }
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java
index 91f8b991c98..2a4b4fb4803 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java
@@ -261,7 +261,7 @@ private void processDirectory(final MapWriter writer, final Directory directory,
   }
 
   private void processXmpDirectory(final MapWriter writer, final XmpDirectory directory) {
-    HashSet<String> listItems = new HashSet();
+    HashSet<String> listItems = new HashSet<>();
     XMPMeta xmpMeta = directory.getXMPMeta();
     if (xmpMeta != null) {
       try {
@@ -490,4 +490,9 @@ public void close() throws Exception {
       metadataStream.close();
     }
   }
+
+  @Override
+  public String toString() {
+    return "ImageRecordReader[Path=" + hadoopPath.toUri().getPath() + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
index 56bc1cc2d25..e5d1dc438a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
@@ -761,4 +761,11 @@ public void close() {
       reader = null;
     }
   }
-}
\ No newline at end of file
+
+  @Override
+  public String toString() {
+    return "LogRecordReader[File=" + fileWork.getPath()
+        + ", Line=" + rowIndex
+        + "]";
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
index 1cd393becc1..aea3218592d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
@@ -121,4 +121,11 @@ public int getOperatorType() {
     return CoreOperatorType.PARQUET_WRITER_VALUE;
   }
 
+  @Override
+  public String toString() {
+    return "ParquetWriter[location=" + location
+        + ", storageStrategy=" + getStorageStrategy()
+        + ", partitionColumns=" + partitionColumns
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 2ada17d9b72..17cf8c44dfd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -348,4 +348,14 @@ private int initNumRecordsToRead(long numRecordsToRead, int rowGroupIndex, Parqu
       return (int) Math.min(numRecordsToRead, footer.getBlocks().get(rowGroupIndex).getRowCount());
     }
   }
+
+  @Override
+  public String toString() {
+    return "ParquetRecordReader[File=" + hadoopPath.toUri()
+        + ", Row group index=" + rowGroupIndex
+        + ", Records in row group=" + footer.getBlocks().get(rowGroupIndex).getRowCount()
+        + ", Total records read=" + (readState != null ? readState.recordsRead() : -1)
+        + ", Metadata" + footer
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 0c69d6de51a..7108ca6727b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -344,4 +344,9 @@ public void close() {
       this.type = type;
     }
   }
+
+  @Override
+  public String toString() {
+    return "DrillParquetReader[pageReadStore=" + pageReadStore + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
index 794687f6501..d688f3b07c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
@@ -396,4 +396,9 @@ private void setStringColumnValue(final String data, final ProjectedColumnInfo p
           .setSafe(count, value, 0, value.remaining());
     }
   }
+
+  @Override
+  public String toString() {
+    return "PcapRecordReader[File=" + pathToFile.toUri() + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index d2cd9a8afcc..0db17fbb5f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -232,4 +232,13 @@ public void close() {
       logger.warn("Exception closing reader: {}", e);
     }
   }
+
+  @Override
+  public String toString() {
+    return "DrillTextRecordReader[File=" + split.getPath()
+        + ", Record=" + (totalRecordsRead + 1)
+        + ", Start=" + split.getStart()
+        + ", Length=" + split.getLength()
+        + "]";
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 2810c04dd47..aaa9806a4d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -70,10 +70,6 @@
    * outer list.
    */
   private boolean inOuterList;
-  /**
-   * The name of the current field being parsed. For Error messages.
-   */
-  private String currentFieldName;
 
   private FieldSelection selection;
 
@@ -214,10 +210,9 @@ public ReadState write(ComplexWriter writer) throws IOException {
       case WRITE_SUCCEED:
         break;
       default:
-        throw getExceptionWithContext(UserException.dataReadError(),
-            currentFieldName, null).message(
-            "Failure while reading JSON. (Got an invalid read state %s )",
-            readState.toString()).build(logger);
+        throw getExceptionWithContext(UserException.dataReadError(), null).message(
+            "Failure while reading JSON. (Got an invalid read state %s )", readState.toString())
+            .build(logger);
       }
     } catch (com.fasterxml.jackson.core.JsonParseException ex) {
       if (ignoreJSONParseError()) {
@@ -236,13 +231,10 @@ public ReadState write(ComplexWriter writer) throws IOException {
   private void confirmLast() throws IOException {
     parser.nextToken();
     if (!parser.isClosed()) {
-      throw getExceptionWithContext(UserException.dataReadError(),
-          currentFieldName, null)
-          .message(
-              "Drill attempted to unwrap a toplevel list "
-                  + "in your document.  However, it appears that there is trailing content after this top level list.  Drill only "
-                  + "supports querying a set of distinct maps or a single json array with multiple inner maps.")
-          .build(logger);
+      String message = "Drill attempted to unwrap a toplevel list in your document. "
+          + "However, it appears that there is trailing content after this top level list.  Drill only "
+          + "supports querying a set of distinct maps or a single json array with multiple inner maps.";
+      throw getExceptionWithContext(UserException.dataReadError(), message).build(logger);
     }
   }
 
@@ -255,11 +247,9 @@ private ReadState writeToVector(ComplexWriter writer, JsonToken t)
       break;
     case START_ARRAY:
       if (inOuterList) {
-        throw getExceptionWithContext(UserException.dataReadError(),
-            currentFieldName, null)
-            .message(
-                "The top level of your document must either be a single array of maps or a set "
-                    + "of white space delimited maps.").build(logger);
+        String message = "The top level of your document must either be a single array of maps or a set "
+            + "of white space delimited maps.";
+        throw getExceptionWithContext(UserException.dataReadError(), message).build(logger);
       }
 
       if (skipOuterList) {
@@ -268,11 +258,9 @@ private ReadState writeToVector(ComplexWriter writer, JsonToken t)
           inOuterList = true;
           writeDataSwitch(writer.rootAsMap());
         } else {
-          throw getExceptionWithContext(UserException.dataReadError(),
-              currentFieldName, null)
-              .message(
-                  "The top level of your document must either be a single array of maps or a set "
-                      + "of white space delimited maps.").build(logger);
+          String message = "The top level of your document must either be a single array of maps or a set "
+              + "of white space delimited maps.";
+          throw getExceptionWithContext(UserException.dataReadError(), message).build(logger);
         }
 
       } else {
@@ -285,17 +273,14 @@ private ReadState writeToVector(ComplexWriter writer, JsonToken t)
         confirmLast();
         return ReadState.END_OF_STREAM;
       } else {
-        throw getExceptionWithContext(UserException.dataReadError(),
-            currentFieldName, null).message(
-            "Failure while parsing JSON.  Ran across unexpected %s.",
-            JsonToken.END_ARRAY).build(logger);
+        throw getExceptionWithContext(UserException.dataReadError(), null).message(
+            "Failure while parsing JSON.  Ran across unexpected %s.", JsonToken.END_ARRAY).build(logger);
       }
 
     case NOT_AVAILABLE:
       return ReadState.END_OF_STREAM;
     default:
-      throw getExceptionWithContext(UserException.dataReadError(),
-          currentFieldName, null)
+      throw getExceptionWithContext(UserException.dataReadError(), null)
           .message(
               "Failure while parsing JSON.  Found token of [%s].  Drill currently only supports parsing "
                   + "json strings that contain either lists or maps.  The root object cannot be a scalar.",
@@ -412,9 +397,9 @@ private void writeData(MapWriter map, FieldSelection selection,
           break;
 
         default:
-          throw getExceptionWithContext(UserException.dataReadError(),
-              currentFieldName, null).message("Unexpected token %s",
-              parser.getCurrentToken()).build(logger);
+          throw getExceptionWithContext(UserException.dataReadError(), null)
+              .message("Unexpected token %s", parser.getCurrentToken())
+              .build(logger);
         }
 
       }
@@ -478,8 +463,7 @@ private void writeDataAllText(MapWriter map, FieldSelection selection,
         break;
 
       default:
-        throw getExceptionWithContext(UserException.dataReadError(),
-            currentFieldName, null).message("Unexpected token %s",
+        throw getExceptionWithContext(UserException.dataReadError(), null).message("Unexpected token %s",
             parser.getCurrentToken()).build(logger);
       }
     }
@@ -591,8 +575,7 @@ private void writeData(ListWriter list) throws IOException {
               .build(logger);
         }
       } catch (Exception e) {
-        throw getExceptionWithContext(e, this.currentFieldName, null).build(
-            logger);
+        throw getExceptionWithContext(e, null).build(logger);
       }
     }
     list.endList();
@@ -637,8 +620,7 @@ private void writeDataAllText(ListWriter list) throws IOException {
         handleString(parser, list);
         break;
       default:
-        throw getExceptionWithContext(UserException.dataReadError(),
-            currentFieldName, null).message("Unexpected token %s",
+        throw getExceptionWithContext(UserException.dataReadError(), null).message("Unexpected token %s",
             parser.getCurrentToken()).build(logger);
       }
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 7cb07eb6421..a9e9e62eccf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -207,6 +207,10 @@ public void cancel() {
   }
 
   private void cleanup(FragmentState state) {
+    if (fragmentState.get() == FragmentState.FAILED) {
+      root.dumpBatches();
+    }
+
     closeOutResources();
 
     updateState(state);
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
index 89731ff2a4e..4bb1a22eef9 100644
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -269,4 +269,9 @@ public PageReader getPageReader(ColumnDescriptor descriptor) {
   public long getRowCount() {
     return rowCount;
   }
+
+  @Override
+  public String toString() {
+    return "ColumnChunkIncReadStore[File=" + path.toUri() + "]";
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestOperatorDump.java b/exec/java-exec/src/test/java/org/apache/drill/TestOperatorDump.java
new file mode 100644
index 00000000000..18ba61cf05e
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestOperatorDump.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.ConsoleAppender;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch;
+import org.apache.drill.exec.testing.Controls;
+import org.apache.drill.exec.testing.ControlsInjectionUtil;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.LogFixture;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestOperatorDump extends ClusterTest {
+
+  private static final String ENTRY_DUMP_COMPLETED = "Batch dump completed";
+  private static final String ENTRY_DUMP_STARTED = "Batch dump started";
+
+  private LogFixture logFixture;
+  private EventAwareContextAppender appender;
+
+  @BeforeClass
+  public static void setupFiles() {
+    dirTestWatcher.copyResourceToRoot(Paths.get("multilevel"));
+  }
+
+  @Before
+  public void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    appender = new EventAwareContextAppender();
+    logFixture = LogFixture.builder()
+        .toConsole(appender, LogFixture.DEFAULT_CONSOLE_FORMAT)
+        .build();
+    startCluster(builder);
+  }
+
+  @After
+  public void tearDown(){
+    logFixture.close();
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testScanBatchChecked() throws Exception {
+    String exceptionDesc = "next-allocate";
+    final String controls = Controls.newBuilder()
+        .addException(ScanBatch.class, exceptionDesc, OutOfMemoryException.class, 0, 1)
+        .build();
+    ControlsInjectionUtil.setControls(client.client(), controls);
+    String query = "select * from dfs.`multilevel/parquet` limit 100";
+    try {
+      client.queryBuilder().sql(query).run();
+    } catch (UserRemoteException e) {
+      assertTrue(e.getMessage().contains(exceptionDesc));
+
+      String[] expectedEntries = new String[] {ENTRY_DUMP_STARTED, ENTRY_DUMP_COMPLETED};
+      validateContainsEntries(expectedEntries, ScanBatch.class.getName());
+      throw e;
+    }
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testExternalSortUnchecked() throws Exception {
+    Class<?> siteClass = org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch.class;
+    final String controls = Controls.newBuilder()
+        .addException(siteClass, ExternalSortBatch.INTERRUPTION_AFTER_SORT, RuntimeException.class)
+        .build();
+    ControlsInjectionUtil.setControls(client.client(), controls);
+    String query = "select n_name from cp.`tpch/lineitem.parquet` order by n_name";
+    try {
+      client.queryBuilder().sql(query).run();
+    } catch (UserRemoteException e) {
+      assertTrue(e.getMessage().contains(ExternalSortBatch.INTERRUPTION_AFTER_SORT));
+
+      String[] expectedEntries = new String[] {ENTRY_DUMP_STARTED, ENTRY_DUMP_COMPLETED};
+      validateContainsEntries(expectedEntries, ExternalSortBatch.class.getName());
+      throw e;
+    }
+  }
+
+  private void validateContainsEntries(String[] entries, String expectedClassName) {
+    if (entries == null) {
+      entries = new String[0];
+    }
+    List<String> messages = appender.getMessages();
+    List<String> entryList = new ArrayList<>(entries.length);
+    Collections.addAll(entryList, entries);
+    Iterator<String> it = entryList.iterator();
+    while (it.hasNext()) {
+      String entry = it.next();
+      for (String message : messages) {
+        if (message.contains(entry)) {
+          it.remove();
+          break;
+        }
+      }
+    }
+    assertTrue(String.format("Entries %s were not found in logs.", entryList), entryList.isEmpty());
+
+    Set<String> loggerNames = appender.getLoggerNames();
+    assertTrue(String.format("Entry for class %s was not found", expectedClassName),
+        loggerNames.contains(expectedClassName));
+  }
+
+  // ConsoleAppender which stores logged events
+  private static class EventAwareContextAppender extends ConsoleAppender<ILoggingEvent> {
+
+    private List<ILoggingEvent> events = new ArrayList<>();
+
+    @Override
+    protected void append(ILoggingEvent e) {
+      events.add(e);
+    }
+
+    List<String> getMessages() {
+      return events.stream()
+          .map(ILoggingEvent::getMessage)
+          .collect(Collectors.toList());
+    }
+
+    Set<String> getLoggerNames() {
+      return events.stream()
+          .map(ILoggingEvent::getLoggerName)
+          .collect(Collectors.toSet());
+    }
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 34d735e764e..94e0c0ec7c6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -288,6 +288,15 @@ public void useUnnestKillHandlingForLimit(boolean limitWithUnnest) {
     this.limitWithUnnest = limitWithUnnest;
   }
 
+  @Override
+  public boolean hasFailed() {
+    return false;
+  }
+
+  @Override
+  public void dump() {
+  }
+
   public static class Builder {
     private final List<RowSet> rowSets = new ArrayList<>();
     private final List<IterOutcome> iterOutcomes = new ArrayList<>();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index 6d5b66624e3..a1766460c3d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -105,6 +105,11 @@ public boolean next() {
     }
   }
 
+  @Override
+  public void dumpBatches() {
+    screenRoot.dumpBatches();
+  }
+
   @Override
   public void close() throws Exception {
     screenRoot.close();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
index c7105f9c492..aefa28a0c13 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
@@ -162,6 +162,15 @@ public void close() throws Exception {
 
   }
 
+  @Override
+  public boolean hasFailed() {
+    return false;
+  }
+
+  @Override
+  public void dump() {
+  }
+
   @Override public int getRecordCount() {
     return 0;
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
index 6f1e6e0b255..c05cdfdf8ff 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
@@ -121,6 +121,15 @@ public WritableBatch getWritableBatch() {
     public Iterator<VectorWrapper<?>> iterator() {
       return null;
     }
+
+    @Override
+    public void dump() {
+    }
+
+    @Override
+    public boolean hasFailed() {
+      return false;
+    }
   }
 
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
index b54b0b09950..b62a18805d9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
@@ -86,6 +86,7 @@ public LogSpec(String loggerName, Level level) {
     private String consoleFormat = DEFAULT_CONSOLE_FORMAT;
     private boolean logToConsole;
     private List<LogSpec> loggers = new ArrayList<>();
+    private ConsoleAppender<ILoggingEvent> appender;
 
     /**
      * Send all enabled logging to the console (if not already configured.) Some
@@ -102,6 +103,11 @@ public LogFixtureBuilder toConsole() {
       return this;
     }
 
+    public LogFixtureBuilder toConsole(ConsoleAppender<ILoggingEvent> appender, String format) {
+      this.appender = appender;
+      return toConsole(format);
+    }
+
     /**
      * Send logging to the console using the defined format.
      *
@@ -195,7 +201,7 @@ public static LogFixtureBuilder builder() {
 
   private void setupConsole(LogFixtureBuilder builder) {
     drillLogger = (Logger)LoggerFactory.getLogger(DRILL_PACKAGE_NAME);
-    if (drillLogger.getAppender("STDOUT") != null) {
+    if (builder.appender == null && drillLogger.getAppender("STDOUT") != null) {
       return;
     }
     LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
@@ -204,10 +210,10 @@ private void setupConsole(LogFixtureBuilder builder) {
     ple.setContext(lc);
     ple.start();
 
-    appender = new ConsoleAppender<>( );
+    appender = builder.appender == null ? new ConsoleAppender<>() : builder.appender;
     appender.setContext(lc);
     appender.setName("Console");
-    appender.setEncoder( ple );
+    appender.setEncoder(ple);
     appender.start();
 
     Logger root = (Logger)LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services