You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2018/10/01 12:35:39 UTC
[drill] 01/02: DRILL-6724: Dump operator context to logs when error
occurs during query execution
This is an automated email from the ASF dual-hosted git repository.
vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 98e5de3b5af862779244bac8329852b3c9a901df
Author: Bohdan Kazydub <bo...@gmail.com>
AuthorDate: Fri Aug 31 18:20:34 2018 +0300
DRILL-6724: Dump operator context to logs when error occurs during query execution
closes #1455
---
.../drill/common/exceptions/UserException.java | 4 +-
.../common/exceptions/UserExceptionContext.java | 19 ++-
.../store/mapr/db/json/MaprDBJsonRecordReader.java | 17 ++-
.../drill/exec/store/hbase/HBaseRecordReader.java | 5 +
.../store/hive/readers/HiveAbstractReader.java | 15 ++
.../drill/exec/store/jdbc/JdbcRecordReader.java | 8 +-
.../drill/exec/store/kafka/KafkaRecordReader.java | 26 +++-
.../store/kafka/decoders/JsonMessageReader.java | 4 +
.../drill/exec/store/kudu/KuduRecordReader.java | 11 ++
.../apache/drill/exec/store/kudu/KuduWriter.java | 5 +
.../drill/exec/store/mongo/MongoRecordReader.java | 5 +
.../drill/exec/physical/config/HashAggregate.java | 8 ++
.../physical/config/OrderedPartitionSender.java | 11 ++
.../drill/exec/physical/config/PartitionLimit.java | 5 +
.../apache/drill/exec/physical/config/Sort.java | 7 +
.../apache/drill/exec/physical/config/TopN.java | 7 +
.../drill/exec/physical/config/WindowPOP.java | 16 +++
.../drill/exec/physical/impl/BaseRootExec.java | 23 +++
.../apache/drill/exec/physical/impl/RootExec.java | 6 +
.../apache/drill/exec/physical/impl/ScanBatch.java | 30 +++-
.../drill/exec/physical/impl/TopN/TopNBatch.java | 9 +-
.../exec/physical/impl/WriterRecordBatch.java | 6 +
.../exec/physical/impl/aggregate/HashAggBatch.java | 9 ++
.../physical/impl/aggregate/HashAggTemplate.java | 9 ++
.../impl/aggregate/SpilledRecordbatch.java | 35 ++++-
.../physical/impl/aggregate/StreamingAggBatch.java | 6 +
.../impl/aggregate/StreamingAggTemplate.java | 10 ++
.../physical/impl/filter/FilterRecordBatch.java | 8 ++
.../exec/physical/impl/filter/FilterTemplate2.java | 7 +
.../exec/physical/impl/filter/FilterTemplate4.java | 6 +
.../impl/filter/RuntimeFilterRecordBatch.java | 9 +-
.../physical/impl/flatten/FlattenRecordBatch.java | 6 +
.../physical/impl/flatten/FlattenTemplate.java | 11 ++
.../exec/physical/impl/join/HashJoinBatch.java | 6 +
.../physical/impl/join/HashJoinProbeTemplate.java | 13 ++
.../exec/physical/impl/join/LateralJoinBatch.java | 8 ++
.../exec/physical/impl/join/MergeJoinBatch.java | 7 +
.../physical/impl/join/NestedLoopJoinBatch.java | 8 ++
.../exec/physical/impl/limit/LimitRecordBatch.java | 74 +++++-----
.../impl/limit/PartitionLimitRecordBatch.java | 10 +-
.../impl/mergereceiver/MergingRecordBatch.java | 8 ++
.../OrderedPartitionRecordBatch.java | 6 +
.../impl/producer/ProducerConsumerBatch.java | 5 +
.../physical/impl/project/ProjectRecordBatch.java | 6 +
.../physical/impl/project/ProjectorTemplate.java | 6 +
.../impl/protocol/OperatorRecordBatch.java | 18 ++-
.../drill/exec/physical/impl/sort/SortBatch.java | 4 +
.../exec/physical/impl/sort/SortTemplate.java | 4 +
.../impl/svremover/RemovingRecordBatch.java | 5 +
.../exec/physical/impl/trace/TraceRecordBatch.java | 6 +-
.../physical/impl/union/UnionAllRecordBatch.java | 5 +
.../exec/physical/impl/unnest/UnnestImpl.java | 10 ++
.../physical/impl/unnest/UnnestRecordBatch.java | 7 +-
.../unorderedreceiver/UnorderedReceiverBatch.java | 36 +++--
.../validate/IteratorValidatorBatchIterator.java | 13 +-
.../physical/impl/window/FrameSupportTemplate.java | 11 ++
.../impl/window/NoFrameSupportTemplate.java | 11 ++
.../exec/physical/impl/window/WindowDataBatch.java | 5 +
.../impl/window/WindowFrameRecordBatch.java | 7 +
.../physical/impl/xsort/ExternalSortBatch.java | 4 +
.../exec/physical/impl/xsort/MSortTemplate.java | 9 ++
.../impl/xsort/SingleBatchSorterTemplate.java | 5 +
.../impl/xsort/managed/ExternalSortBatch.java | 7 +
.../physical/impl/xsort/managed/SortConfig.java | 9 ++
.../exec/physical/impl/xsort/managed/SortImpl.java | 8 ++
.../drill/exec/record/AbstractRecordBatch.java | 50 +++++--
.../record/AbstractTableFunctionRecordBatch.java | 1 -
.../org/apache/drill/exec/record/RecordBatch.java | 15 ++
.../drill/exec/record/RecordBatchLoader.java | 8 ++
.../apache/drill/exec/record/RecordIterator.java | 14 ++
.../apache/drill/exec/record/SchemalessBatch.java | 10 ++
.../drill/exec/record/SimpleRecordBatch.java | 13 ++
.../exec/record/selection/SelectionVector4.java | 9 ++
.../drill/exec/store/AbstractRecordReader.java | 1 -
.../org/apache/drill/exec/store/RecordReader.java | 4 +-
.../apache/drill/exec/store/StorageStrategy.java | 7 +-
.../drill/exec/store/avro/AvroRecordReader.java | 15 +-
.../drill/exec/store/bson/BsonRecordReader.java | 25 ++--
.../drill/exec/store/dfs/easy/EasyWriter.java | 8 ++
.../exec/store/easy/json/JSONRecordReader.java | 46 +++---
.../drill/exec/store/easy/json/JsonProcessor.java | 16 +--
.../store/easy/json/reader/BaseJsonProcessor.java | 48 ++++---
.../store/easy/json/reader/CountingJsonReader.java | 14 +-
.../sequencefile/SequenceFileRecordReader.java | 17 ++-
.../text/compliant/CompliantTextRecordReader.java | 9 +-
.../exec/store/easy/text/compliant/TextReader.java | 8 ++
.../exec/store/httpd/HttpdLogFormatPlugin.java | 8 ++
.../drill/exec/store/image/ImageRecordReader.java | 7 +-
.../drill/exec/store/log/LogRecordReader.java | 9 +-
.../drill/exec/store/parquet/ParquetWriter.java | 7 +
.../parquet/columnreaders/ParquetRecordReader.java | 10 ++
.../exec/store/parquet2/DrillParquetReader.java | 5 +
.../drill/exec/store/pcap/PcapRecordReader.java | 5 +
.../exec/store/text/DrillTextRecordReader.java | 9 ++
.../drill/exec/vector/complex/fn/JsonReader.java | 62 +++-----
.../drill/exec/work/fragment/FragmentExecutor.java | 4 +
.../parquet/hadoop/ColumnChunkIncReadStore.java | 5 +
.../java/org/apache/drill/TestOperatorDump.java | 159 +++++++++++++++++++++
.../drill/exec/physical/impl/MockRecordBatch.java | 9 ++
.../drill/exec/physical/impl/SimpleRootExec.java | 5 +
.../physical/impl/unnest/MockLateralJoinBatch.java | 9 ++
.../drill/exec/work/filter/BloomFilterTest.java | 9 ++
.../java/org/apache/drill/test/LogFixture.java | 12 +-
103 files changed, 1165 insertions(+), 211 deletions(-)
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
index 56cb935..3a3ca5a 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
@@ -771,8 +771,10 @@ public class UserException extends DrillRuntimeException {
* @return generated user error message
*/
private String generateMessage(boolean includeErrorIdAndIdentity) {
+ boolean seeLogsMessage = errorType == DrillPBError.ErrorType.INTERNAL_ERROR
+ || errorType == DrillPBError.ErrorType.SYSTEM;
return errorType + " ERROR: " + super.getMessage() + "\n\n" +
- context.generateContextMessage(includeErrorIdAndIdentity);
+ context.generateContextMessage(includeErrorIdAndIdentity, seeLogsMessage);
}
}
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java b/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java
index fa2c437..271462a 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java
@@ -29,6 +29,8 @@ import org.apache.drill.exec.proto.CoordinationProtos;
*/
class UserExceptionContext {
+ private static final String NEW_LINE = System.lineSeparator();
+
private final String errorId;
private final List<String> contextList;
@@ -133,17 +135,26 @@ class UserExceptionContext {
* generate a context message
* @return string containing all context information concatenated
*/
- String generateContextMessage(boolean includeErrorIdAndIdentity) {
+ String generateContextMessage(boolean includeErrorIdAndIdentity, boolean includeSeeLogsMessage) {
StringBuilder sb = new StringBuilder();
for (String context : contextList) {
- sb.append(context).append("\n");
+ sb.append(context)
+ .append(NEW_LINE);
+ }
+
+ if (includeSeeLogsMessage) {
+ sb.append(NEW_LINE)
+ .append("Please, refer to logs for more information.")
+ .append(NEW_LINE);
}
if (includeErrorIdAndIdentity) {
// add identification infos
- sb.append("\n[Error Id: ");
- sb.append(errorId).append(" ");
+ sb.append(NEW_LINE)
+ .append("[Error Id: ")
+ .append(errorId)
+ .append(" ");
if (endpoint != null) {
sb.append("on ")
.append(endpoint.getAddress())
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
index 3c7ca8e..b68f574 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -81,6 +81,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
private final String tableName;
private OperatorContext operatorContext;
private VectorContainerWriter vectorWriter;
+ private DBDocumentReaderBase reader;
private DrillBuf buffer;
@@ -195,7 +196,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
vectorWriter.reset();
int recordCount = 0;
- DBDocumentReaderBase reader = null;
+ reader = null;
while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION) {
vectorWriter.setPosition(recordCount);
@@ -526,4 +527,18 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
table.close();
}
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("MaprDBJsonRecordReader[Table=")
+ .append(table.getPath());
+ if (reader != null) {
+ sb.append(", Document ID=")
+ .append(IdCodec.asString(reader.getId()));
+ }
+ sb.append(", reader=")
+ .append(reader)
+ .append(']');
+ return sb.toString();
+ }
}
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 86038c4..2db1d02 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -338,4 +338,9 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
return rowCount < TARGET_RECORD_COUNT &&
operatorContext.getAllocator().getAllocatedMemory() < MAX_ALLOCATED_MEMORY_PER_BATCH;
}
+
+ @Override
+ public String toString() {
+ return "HBaseRecordReader[Table=" + hbaseTableName.getNamespaceAsString() + "]";
+ }
}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
index 354a61e..ba1cd30 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
@@ -430,4 +430,19 @@ public abstract class HiveAbstractReader extends AbstractRecordReader {
}
}
+ @Override
+ public String toString() {
+ long position = -1;
+ try {
+ if (reader != null) {
+ position = reader.getPos();
+ }
+ } catch (IOException e) {
+ logger.trace("Unable to obtain reader position: " + e.getMessage());
+ }
+ return getClass().getSimpleName() + "[Database=" + table.getDbName()
+ + ", Table=" + table.getTableName()
+ + ", Position=" + position
+ + "]";
+ }
}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
index 1b6e211..cd732a6 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
@@ -279,6 +279,13 @@ class JdbcRecordReader extends AbstractRecordReader {
AutoCloseables.close(resultSet, statement, connection);
}
+ @Override
+ public String toString() {
+ return "JdbcRecordReader[sql=" + sql
+ + ", Plugin=" + storagePluginName
+ + "]";
+ }
+
private abstract class Copier<T extends ValueVector.Mutator> {
protected final int columnIndex;
protected final ResultSet result;
@@ -478,5 +485,4 @@ class JdbcRecordReader extends AbstractRecordReader {
}
}
-
}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
index d715ada..9559c3d 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
@@ -60,6 +60,7 @@ public class KafkaRecordReader extends AbstractRecordReader {
private final boolean enableAllTextMode;
private final boolean readNumbersAsDouble;
private final String kafkaMsgReader;
+ private int currentMessageCount;
public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, List<SchemaPath> projectedColumns,
FragmentContext context, KafkaStoragePlugin plugin) {
@@ -105,27 +106,27 @@ public class KafkaRecordReader extends AbstractRecordReader {
writer.allocate();
writer.reset();
Stopwatch watch = Stopwatch.createStarted();
- int messageCount = 0;
+ currentMessageCount = 0;
try {
while (currentOffset < subScanSpec.getEndOffset() - 1 && msgItr.hasNext()) {
ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
currentOffset = consumerRecord.offset();
- writer.setPosition(messageCount);
+ writer.setPosition(currentMessageCount);
messageReader.readMessage(consumerRecord);
- if (++messageCount >= DEFAULT_MESSAGES_PER_BATCH) {
+ if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
break;
}
}
messageReader.ensureAtLeastOneField();
- writer.setValueCount(messageCount);
- logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), messageCount);
+ writer.setValueCount(currentMessageCount);
+ logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), currentMessageCount);
logger.debug("Last offset consumed for {}:{} is {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
currentOffset);
- return messageCount;
+ return currentMessageCount;
} catch (Exception e) {
- String msg = "Failure while reading messages from kafka. Recordreader was at record: " + (messageCount + 1);
+ String msg = "Failure while reading messages from kafka. Recordreader was at record: " + (currentMessageCount + 1);
throw UserException.dataReadError(e).message(msg).addContext(e.getMessage()).build(logger);
}
}
@@ -139,4 +140,15 @@ public class KafkaRecordReader extends AbstractRecordReader {
messageReader.close();
}
+ @Override
+ public String toString() {
+ return "KafkaRecordReader[messageReader=" + messageReader
+ + ", kafkaPollTimeOut=" + kafkaPollTimeOut
+ + ", currentOffset=" + currentOffset
+ + ", enableAllTextMode=" + enableAllTextMode
+ + ", readNumbersAsDouble=" + readNumbersAsDouble
+ + ", kafkaMsgReader=" + kafkaMsgReader
+ + ", currentMessageCount=" + currentMessageCount
+ + "]";
+ }
}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
index a62357d..40e9e12 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
@@ -105,4 +105,8 @@ public class JsonMessageReader implements MessageReader {
}
}
+ @Override
+ public String toString() {
+ return "JsonMessageReader[jsonReader=" + jsonReader + "]";
+ }
}
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
index 845738c..976b16d 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
@@ -78,6 +78,9 @@ public class KuduRecordReader extends AbstractRecordReader {
private OutputMutator output;
private OperatorContext context;
+ private String lastColumnName;
+ private Type lastColumnType;
+
private static class ProjectedColumnInfo {
int index;
ValueVector vv;
@@ -176,6 +179,8 @@ public class KuduRecordReader extends AbstractRecordReader {
final String name = col.getName();
final Type kuduType = col.getType();
+ lastColumnName = name;
+ lastColumnType = kuduType;
MinorType minorType = TYPES.get(kuduType);
if (minorType == null) {
logger.warn("Ignoring column that is unsupported.", UserException
@@ -326,4 +331,10 @@ public class KuduRecordReader extends AbstractRecordReader {
public void close() {
}
+ @Override
+ public String toString() {
+ return "KuduRecordReader[Column=" + lastColumnName
+ + ", Type=" + lastColumnType
+ + "]";
+ }
}
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
index d0fa158..7611576 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
@@ -77,4 +77,9 @@ public class KuduWriter extends AbstractWriter {
public KuduStoragePlugin getPlugin() {
return plugin;
}
+
+ @Override
+ public String toString() {
+ return "KuduWriter[name=" + name + ", storageStrategy=" + getStorageStrategy() + "]";
+ }
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index a79e39a..f5d1f2e 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -216,4 +216,9 @@ public class MongoRecordReader extends AbstractRecordReader {
public void close() {
}
+ @Override
+ public String toString() {
+ Object reader = isBsonRecordReader ? bsonReader : jsonReader;
+ return "MongoRecordReader[reader=" + reader + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
index da988de..521aba1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
@@ -106,4 +106,12 @@ public class HashAggregate extends AbstractSingle {
return queryContext == null ||
1 < (int) queryContext.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
}
+
+ @Override
+ public String toString() {
+ return "HashAggregate[groupByExprs=" + groupByExprs
+ + ", aggrExprs=" + aggrExprs
+ + ", cardinality=" + cardinality
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
index d28d563..320bc6d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
@@ -106,4 +106,15 @@ public class OrderedPartitionSender extends AbstractSender {
public int getOperatorType() {
return CoreOperatorType.ORDERED_PARTITION_SENDER_VALUE;
}
+
+ @Override
+ public String toString() {
+ return "OrderedPartitionSender[orderings=" + orderings
+ + ", ref=" + ref
+ + ", sendingWidth=" + sendingWidth
+ + ", recordsToSample=" + recordsToSample
+ + ", samplingFactor=" + samplingFactor
+ + ", completionFactor=" + completionFactor
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
index 29f8bb2..4ea710d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
@@ -59,4 +59,9 @@ public class PartitionLimit extends Limit {
public int getOperatorType() {
return CoreOperatorType.PARTITION_LIMIT_VALUE;
}
+
+ @Override
+ public String toString() {
+ return "PartitionLimit[partitionColumn=" + partitionColumn + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
index 85ef7da..5d65c39 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
@@ -71,4 +71,11 @@ public class Sort extends AbstractSingle{
public int getOperatorType() {
return CoreOperatorType.OLD_SORT_VALUE;
}
+
+ @Override
+ public String toString() {
+ return "Sort[orderings=" + orderings
+ + ", reverse=" + reverse
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java
index 54d1b4d..d2a87d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java
@@ -69,4 +69,11 @@ public class TopN extends Sort {
return CoreOperatorType.TOP_N_SORT_VALUE;
}
+ @Override
+ public String toString() {
+ return "TopN[orderings=" + orderings
+ + ", reverse=" + reverse
+ + ", limit=" + limit
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java
index 543c09f..3ddaa7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java
@@ -95,6 +95,17 @@ public class WindowPOP extends AbstractSingle {
return frameUnitsRows;
}
+ @Override
+ public String toString() {
+ return "WindowPOP[withins=" + withins
+ + ", aggregations=" + aggregations
+ + ", orderings=" + orderings
+ + ", frameUnitsRows=" + frameUnitsRows
+ + ", start=" + start
+ + ", end=" + end
+ + "]";
+ }
+
@JsonTypeName("windowBound")
public static class Bound {
private final boolean unbounded;
@@ -117,6 +128,11 @@ public class WindowPOP extends AbstractSingle {
public long getOffset() {
return offset;
}
+
+ @Override
+ public String toString() {
+ return "Bound[unbounded=" + unbounded + ", offset=" + offset + "]";
+ }
}
public static Bound newBound(RexWindowBound windowBound) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index e148278..9142a2a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.physical.impl;
+import java.util.LinkedList;
import java.util.List;
import org.apache.drill.common.DeferredException;
@@ -124,6 +125,28 @@ public abstract class BaseRootExec implements RootExec {
}
@Override
+ public void dumpBatches() {
+ final int numberOfBatchesToDump = 2;
+ logger.error("Batch dump started: dumping last {} failed batches", numberOfBatchesToDump);
+ // As batches are stored in a 'flat' List there is a need to filter out the failed batch
+ // and a few of its parent (actual number of batches is set by a constant defined above)
+ List<CloseableRecordBatch> failedBatchStack = new LinkedList<>();
+ for (int i = operators.size() - 1; i >= 0; i--) {
+ CloseableRecordBatch batch = operators.get(i);
+ if (batch.hasFailed()) {
+ failedBatchStack.add(0, batch);
+ if (failedBatchStack.size() == numberOfBatchesToDump) {
+ break;
+ }
+ }
+ }
+ for (CloseableRecordBatch batch : failedBatchStack) {
+ batch.dump();
+ }
+ logger.error("Batch dump completed.");
+ }
+
+ @Override
public void close() throws Exception {
// We want to account for the time spent waiting here as Wait time in the operator profile
try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
index df0f89b..34f2131 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
@@ -44,4 +44,10 @@ public interface RootExec extends AutoCloseable {
* @param handle The handle pointing to the downstream receiver that does not need anymore data.
*/
void receivingFragmentFinished(FragmentHandle handle);
+
+ /**
+ * Dump failed batches' state preceded by its parent's state to logs. Invoked when there is a
+ * failure during fragment execution.
+ */
+ void dumpBatches();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index cd24a4c..dc8dd0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -83,6 +83,10 @@ public class ScanBatch implements CloseableRecordBatch {
private final List<Map<String, String>> implicitColumnList;
private String currentReaderClassName;
private final RecordBatchStatsContext batchStatsContext;
+ // Represents last outcome of next(). If an Exception is thrown
+ // during the method's execution a value IterOutcome.STOP will be assigned.
+ private IterOutcome lastOutcome;
+
/**
*
* @param context
@@ -160,7 +164,8 @@ public class ScanBatch implements CloseableRecordBatch {
@Override
public IterOutcome next() {
if (done) {
- return IterOutcome.NONE;
+ lastOutcome = IterOutcome.NONE;
+ return lastOutcome;
}
oContext.getStats().startProcessing();
try {
@@ -168,7 +173,8 @@ public class ScanBatch implements CloseableRecordBatch {
if (currentReader == null && !getNextReaderIfHas()) {
releaseAssets(); // All data has been read. Release resource.
done = true;
- return IterOutcome.NONE;
+ lastOutcome = IterOutcome.NONE;
+ return lastOutcome;
}
injector.injectChecked(context.getExecutionControls(), "next-allocate", OutOfMemoryException.class);
currentReader.allocate(mutator.fieldVectorMap());
@@ -191,7 +197,8 @@ public class ScanBatch implements CloseableRecordBatch {
// This could happen when data sources have a non-trivial schema with 0 row.
container.buildSchema(SelectionVectorMode.NONE);
schema = container.getSchema();
- return IterOutcome.OK_NEW_SCHEMA;
+ lastOutcome = IterOutcome.OK_NEW_SCHEMA;
+ return lastOutcome;
}
// Handle case of same schema.
@@ -199,11 +206,13 @@ public class ScanBatch implements CloseableRecordBatch {
continue; // Skip to next loop iteration if reader returns 0 row and has same schema.
} else {
// return OK if recordCount > 0 && ! isNewSchema
- return IterOutcome.OK;
+ lastOutcome = IterOutcome.OK;
+ return lastOutcome;
}
}
} catch (OutOfMemoryException ex) {
clearFieldVectorMap();
+ lastOutcome = IterOutcome.STOP;
throw UserException.memoryError(ex).build(logger);
} catch (ExecutionSetupException e) {
if (currentReader != null) {
@@ -213,12 +222,15 @@ public class ScanBatch implements CloseableRecordBatch {
logger.error("Close failed for reader " + currentReaderClassName, e2);
}
}
+ lastOutcome = IterOutcome.STOP;
throw UserException.internalError(e)
.addContext("Setup failed for", currentReaderClassName)
.build(logger);
} catch (UserException ex) {
+ lastOutcome = IterOutcome.STOP;
throw ex;
} catch (Exception ex) {
+ lastOutcome = IterOutcome.STOP;
throw UserException.internalError(ex).build(logger);
} finally {
oContext.getStats().stopProcessing();
@@ -559,4 +571,14 @@ public class ScanBatch implements CloseableRecordBatch {
return true;
}
+
+ @Override
+ public boolean hasFailed() {
+ return lastOutcome == IterOutcome.STOP;
+ }
+
+ @Override
+ public void dump() {
+ logger.error("ScanBatch[container={}, currentReader={}, schema={}]", container, currentReader, schema);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index ba4b94a..22dfdf0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -74,6 +74,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
/**
* Operator Batch which implements the TopN functionality. It is more efficient than (sort + limit) since unlike sort
@@ -185,7 +186,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
// Reset the TopN state for next iteration
resetTopNState();
- try{
+ try {
boolean incomingHasSv2 = false;
switch (incoming.getSchema().getSelectionVectorMode()) {
case NONE: {
@@ -693,4 +694,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
return sv4;
}
}
+
+ @Override
+ public void dump() {
+ logger.error("TopNBatch[container={}, config={}, schema={}, sv4={}, countSincePurge={}, " +
+ "batchCount={}, recordCount={}]", container, config, schema, sv4, countSincePurge, batchCount, recordCount);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 65d0c54..3a8485a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -209,4 +209,10 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
closeWriter();
super.close();
}
+
+ @Override
+ public void dump() {
+ logger.error("WriterRecordBatch[container={}, popConfig={}, counter={}, fragmentUniqueId={}, schema={}]",
+ container, popConfig, counter, fragmentUniqueId, schema);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 9de9aae..80d25ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.physical.impl.aggregate;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -500,4 +501,12 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
wasKilled = true;
incoming.kill(sendUpstream);
}
+
+ @Override
+ public void dump() {
+ logger.error("HashAggBatch[container={}, aggregator={}, groupByOutFieldIds={}, aggrOutFieldIds={}, " +
+ "incomingSchema={}, wasKilled={}, numGroupByExprs={}, numAggrExprs={}, popConfig={}]",
+ container, aggregator, Arrays.toString(groupByOutFieldIds), Arrays.toString(aggrOutFieldIds), incomingSchema,
+ wasKilled, numGroupByExprs, numAggrExprs, popConfig);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index e8ae30e..f6dd3da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import javax.inject.Named;
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.drill.common.exceptions.RetryAfterSpillException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ExpressionPosition;
@@ -1603,6 +1604,14 @@ public abstract class HashAggTemplate implements HashAggregator {
}
}
+ @Override
+ public String toString() {
+ // The fields are excluded because they are passed from HashAggBatch
+ String[] excludedFields = new String[] {
+ "baseHashTable", "incoming", "outgoing", "context", "oContext", "allocator", "htables", "newIncoming"};
+ return ReflectionToStringBuilder.toStringExclude(this, excludedFields);
+ }
+
// Code-generated methods (implemented in HashAggBatch)
public abstract void doSetup(@Named("incoming") RecordBatch incoming) throws SchemaChangeException;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
index 7ebce2b..822a810 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.SimpleRecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
@@ -40,6 +41,9 @@ import java.util.Iterator;
* A class to replace "incoming" - instead scanning a spilled partition file
*/
public class SpilledRecordbatch implements CloseableRecordBatch {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleRecordBatch.class);
+
private VectorContainer container;
private InputStream spillStream;
private int spilledBatches;
@@ -49,6 +53,9 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
private String spillFile;
VectorAccessibleSerializable vas;
private IterOutcome initialOutcome;
+ // Represents last outcome of next(). If an Exception is thrown
+ // during the method's execution a value IterOutcome.STOP will be assigned.
+ private IterOutcome lastOutcome;
public SpilledRecordbatch(String spillFile, int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) {
this.context = context;
@@ -66,6 +73,7 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
}
initialOutcome = next(); // initialize the container
+ lastOutcome = initialOutcome;
}
@Override
@@ -126,14 +134,19 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
@Override
public IterOutcome next() {
- if ( ! context.getExecutorState().shouldContinue() ) { return IterOutcome.STOP; }
+ if (!context.getExecutorState().shouldContinue()) {
+ lastOutcome = IterOutcome.STOP;
+ return lastOutcome;
+ }
if ( spilledBatches <= 0 ) { // no more batches to read in this partition
this.close();
- return IterOutcome.NONE;
+ lastOutcome = IterOutcome.NONE;
+ return lastOutcome;
}
if ( spillStream == null ) {
+ lastOutcome = IterOutcome.STOP;
throw new IllegalStateException("Spill stream was null");
}
@@ -152,11 +165,16 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
container = vas.get();
}
} catch (IOException e) {
+ lastOutcome = IterOutcome.STOP;
throw UserException.dataReadError(e).addContext("Failed reading from a spill file").build(HashAggTemplate.logger);
+ } catch (Exception e) {
+ lastOutcome = IterOutcome.STOP;
+ throw e;
}
spilledBatches--; // one less batch to read
- return IterOutcome.OK;
+ lastOutcome = IterOutcome.OK;
+ return lastOutcome;
}
/**
@@ -164,6 +182,17 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
*/
public IterOutcome getInitialOutcome() { return initialOutcome; }
+ @Override
+ public void dump() {
+ logger.error("SpilledRecordbatch[container={}, spilledBatches={}, schema={}, spillFile={}, spillSet={}]",
+ container, spilledBatches, schema, spillFile, spillSet);
+ }
+
+ @Override
+ public boolean hasFailed() {
+ return lastOutcome == IterOutcome.STOP;
+ }
+
/**
* Note: ignoring any IO errors (e.g. file not found)
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index c42f9bf..2b9b317 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -648,4 +648,10 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
protected void killIncoming(boolean sendUpstream) {
incoming.kill(sendUpstream);
}
+
+ @Override
+ public void dump() {
+ logger.error("StreamingAggBatch[container={}, popConfig={}, aggregator={}, incomingSchema={}]",
+ container, popConfig, aggregator, incomingSchema);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index f30616b..4bde7ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -498,6 +498,16 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
public void cleanup() {
}
+ @Override
+ public String toString() {
+ return "StreamingAggTemplate[underlyingIndex=" + underlyingIndex
+ + ", previousIndex=" + previousIndex
+ + ", currentIndex=" + currentIndex
+ + ", addedRecordCount=" + addedRecordCount
+ + ", outputCount=" + outputCount
+ + "]";
+ }
+
public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2);
public abstract boolean isSamePrev(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int b2Index);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index b8d4e76..179e6c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -43,6 +43,9 @@ import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter> {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
+
private SelectionVector2 sv2;
private SelectionVector4 sv4;
private Filterer filter;
@@ -196,4 +199,9 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter> {
throw new SchemaChangeException("Failure while attempting to load generated class", e);
}
}
+
+ @Override
+ public void dump() {
+ logger.error("FilterRecordBatch[container={}, selectionVector2={}, filter={}, popConfig={}]", container, sv2, filter, popConfig);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
index 7b0183b..483a9ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
@@ -119,4 +119,11 @@ public abstract class FilterTemplate2 implements Filterer {
@Named("outIndex") int outIndex)
throws SchemaChangeException;
+ @Override
+ public String toString() {
+ return "FilterTemplate2[outgoingSelectionVector=" + outgoingSelectionVector
+ + ", incomingSelectionVector=" + incomingSelectionVector
+ + ", svMode=" + svMode
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
index e09ed75..d85d6f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
@@ -64,4 +64,10 @@ public abstract class FilterTemplate4 implements Filterer {
public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+ @Override
+ public String toString() {
+ return "FilterTemplate4[outgoingSelectionVector=" + outgoingSelectionVector
+ + ", incomingSelectionVector=" + incomingSelectionVector
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index 7faaaa5..bc21580 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
@@ -247,4 +247,11 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
}
}
}
-}
\ No newline at end of file
+
+ @Override
+ public void dump() {
+ logger.error("RuntimeFilterRecordBatch[container={}, selectionVector={}, toFilterFields={}, "
+ + "originalRecordCount={}, batchSchema={}]",
+ container, sv2, toFilterFields, originalRecordCount, incoming.getSchema());
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 1623319..86ddcd1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -532,4 +532,10 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
updateStats();
super.close();
}
+
+ @Override
+ public void dump() {
+ logger.error("FlattenRecordbatch[hasRemainder={}, remainderIndex={}, recordCount={}, flattener={}, container={}]",
+ hasRemainder, remainderIndex, recordCount, flattener, container);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
index e59abac..fe38244 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
@@ -175,4 +175,15 @@ public abstract class FlattenTemplate implements Flattener {
@Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
public abstract boolean doEval(@Named("inIndex") int inIndex,
@Named("outIndex") int outIndex) throws SchemaChangeException;
+
+ @Override
+ public String toString() {
+ return "FlattenTemplate[svMode=" + svMode
+ + ", fieldToFlatten=" + fieldToFlatten
+ + ", valueIndex=" + valueIndex
+ + ", outputLimit=" + outputLimit
+ + ", innerValueIndex=" + innerValueIndex
+ + ", currentInnerValueIndex=" + currentInnerValueIndex
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 368bb5d..89ab8d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -1276,4 +1276,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
return hj;
}
+ @Override
+ public void dump() {
+ logger.error("HashJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, hashJoinProbe={}," +
+ " rightExpr={}, canSpill={}, buildSchema={}, probeSchema={}]", container, left, right, leftUpstream, rightUpstream,
+ joinType, hashJoinProbe, rightExpr, canSpill, buildSchema, probeSchema);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 639f757..71abeda 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -432,4 +432,17 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
(joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) ? ProbeState.PROJECT_RIGHT :
ProbeState.DONE; // else we're done
}
+
+ @Override
+ public String toString() {
+ return "HashJoinProbeTemplate[container=" + container
+ + ", probeSchema=" + probeSchema
+ + ", joinType=" + joinType
+ + ", recordsToProcess=" + recordsToProcess
+ + ", recordsProcessed=" + recordsProcessed
+ + ", outputRecords=" + outputRecords
+ + ", probeState=" + probeState
+ + ", unmatchedBuildIndexes=" + unmatchedBuildIndexes
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 1aaf5e2..242687f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -1205,4 +1205,12 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
}
}
}
+
+ @Override
+ public void dump() {
+ logger.error("LateralJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, leftSchema={}, " +
+ "rightSchema={}, outputIndex={}, leftJoinIndex={}, rightJoinIndex={}, hasRemainderForLeftJoin={}]",
+ container, left, right, leftUpstream, rightUpstream, leftSchema, rightSchema, outputIndex,
+ leftJoinIndex, rightJoinIndex, hasRemainderForLeftJoin);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 72f776a..d502c4f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -547,4 +547,11 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
}
return materializedExpr;
}
+
+ @Override
+ public void dump() {
+ logger.error("MergeJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, leftIterator={}," +
+ " rightIterator={}, joinStatus={}, joinType={}]",
+ container, left, right, leftUpstream, rightUpstream, joinType, leftIterator, rightIterator, status, joinType);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index e2f93ec..6b0c749 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -459,4 +459,12 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
public int getRecordCount() {
return outputRecords;
}
+
+ @Override
+ public void dump() {
+ logger.error("NestedLoopJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, "
+ + "leftSchema={}, rightSchema={}, outputRecords={}, rightContainer={}, rightCounts={}]",
+ container, left, right, leftUpstream, rightUpstream,
+ leftSchema, rightSchema, outputRecords, rightContainer, rightCounts);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 784d955..bb49187 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -37,7 +37,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
- // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
private SelectionVector2 outgoingSv;
private SelectionVector2 incomingSv;
@@ -58,46 +58,46 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
@Override
public IterOutcome innerNext() {
if (!first && !needMoreRecords(numberOfRecords)) {
- outgoingSv.setRecordCount(0);
- incoming.kill(true);
+ outgoingSv.setRecordCount(0);
+ incoming.kill(true);
- IterOutcome upStream = next(incoming);
+ IterOutcome upStream = next(incoming);
+ if (upStream == IterOutcome.OUT_OF_MEMORY) {
+ return upStream;
+ }
+
+ while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
+ // Clear the memory for the incoming batch
+ for (VectorWrapper<?> wrapper : incoming) {
+ wrapper.getValueVector().clear();
+ }
+ // clear memory for incoming sv (if any)
+ if (incomingSv != null) {
+ incomingSv.clear();
+ }
+ upStream = next(incoming);
if (upStream == IterOutcome.OUT_OF_MEMORY) {
return upStream;
}
-
- while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
- // Clear the memory for the incoming batch
- for (VectorWrapper<?> wrapper : incoming) {
- wrapper.getValueVector().clear();
- }
- // clear memory for incoming sv (if any)
- if (incomingSv != null) {
- incomingSv.clear();
- }
- upStream = next(incoming);
- if (upStream == IterOutcome.OUT_OF_MEMORY) {
- return upStream;
- }
+ }
+ // If EMIT that means leaf operator is UNNEST, in this case refresh the limit states and return EMIT.
+ if (upStream == EMIT) {
+ // Clear the memory for the incoming batch
+ for (VectorWrapper<?> wrapper : incoming) {
+ wrapper.getValueVector().clear();
}
- // If EMIT that means leaf operator is UNNEST, in this case refresh the limit states and return EMIT.
- if (upStream == EMIT) {
- // Clear the memory for the incoming batch
- for (VectorWrapper<?> wrapper : incoming) {
- wrapper.getValueVector().clear();
- }
-
- // clear memory for incoming sv (if any)
- if (incomingSv != null) {
- incomingSv.clear();
- }
-
- refreshLimitState();
- return upStream;
+
+ // clear memory for incoming sv (if any)
+ if (incomingSv != null) {
+ incomingSv.clear();
}
- // other leaf operator behave as before.
- return NONE;
+
+ refreshLimitState();
+ return upStream;
}
+ // other leaf operator behave as before.
+ return NONE;
+ }
return super.innerNext();
}
@@ -266,4 +266,10 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
Integer.MIN_VALUE : Math.max(0, popConfig.getLast()) - recordStartOffset;
first = true;
}
+
+ @Override
+ public void dump() {
+ logger.error("LimitRecordBatch[container={}, offset={}, numberOfRecords={}, incomingSV={}, outgoingSV={}]",
+ container, recordStartOffset, numberOfRecords, incomingSv, outgoingSv);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
index fe1660f..ed7b265 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
@@ -40,7 +40,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
* implicit column for rowId for each row.
*/
public class PartitionLimitRecordBatch extends AbstractSingleRecordBatch<PartitionLimit> {
- // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
private SelectionVector2 outgoingSv;
private SelectionVector2 incomingSv;
@@ -250,4 +250,12 @@ public class PartitionLimitRecordBatch extends AbstractSingleRecordBatch<Partiti
numberOfRecords = (popConfig.getLast() == null) ?
Integer.MIN_VALUE : Math.max(0, popConfig.getLast()) - recordStartOffset;
}
+
+ @Override
+ public void dump() {
+ logger.error("PartitionLimitRecordBatch[container={}, popConfig={}, incomingSV={}, outgoingSV={},"
+ + " recordStartOffset={}, numberOfRecords={}, partitionId={}, unionTypeEnabled={}, state={}]",
+ container, popConfig, incomingSv, outgoingSv, recordStartOffset, numberOfRecords,
+ partitionId, unionTypeEnabled, state);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 791b24a..12ee668 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.physical.impl.mergereceiver;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
@@ -827,4 +828,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
super.close();
}
+ @Override
+ public void dump() {
+ logger.error("MergingRecordBatch[container={}, outgoingPosition={}, incomingBatches={}, batchOffsets={}, "
+ + "tempBatchHolder={}, inputCounts={}, outputCounts={}]",
+ container, outgoingPosition, Arrays.toString(incomingBatches), Arrays.toString(batchOffsets),
+ Arrays.toString(tempBatchHolder), Arrays.toString(inputCounts), Arrays.toString(outputCounts));
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index a3aa11b..63a0121 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -646,4 +646,10 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
}
}
+ @Override
+ public void dump() {
+ logger.error("OrderedPartitionRecordBatch[container={}, popConfig={}, partitionVectors={}, partitions={}, " +
+ "recordsSampled={}, recordCount={}]",
+ container, popConfig, partitionVectors, partitions, recordsSampled, recordCount);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index bbcb758..9385400 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -247,4 +247,9 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
}
}
+ @Override
+ public void dump() {
+ logger.error("ProducerConsumerBatch[container={}, recordCount={}, schema={}, stop={}]",
+ container, recordCount, schema, stop);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 4d55f00..8ea15d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -898,4 +898,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
wasNone = true;
return IterOutcome.OK_NEW_SCHEMA;
}
+
+ @Override
+ public void dump() {
+ logger.error("ProjectRecordBatch[projector={}, hasRemainder={}, remainderIndex={}, recordCount={}, container={}]",
+ projector, hasRemainder, remainderIndex, recordCount, container);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index 02ccd4b..2f1aa02 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -110,4 +110,10 @@ public abstract class ProjectorTemplate implements Projector {
@Named("outIndex") int outIndex)
throws SchemaChangeException;
+ @Override
+ public String toString() {
+ return "Projector[vector2=" + vector2
+ + ", selectionVectorMode=" + svMode
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
index 620f150..e0beab1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
@@ -55,6 +55,7 @@ public class OperatorRecordBatch implements CloseableRecordBatch {
private final OperatorDriver driver;
private final BatchAccessor batchAccessor;
+ private IterOutcome lastOutcome;
public OperatorRecordBatch(FragmentContext context, PhysicalOperator config, OperatorExec opExec) {
OperatorContext opContext = context.newOperatorContext(config);
@@ -143,7 +144,12 @@ public class OperatorRecordBatch implements CloseableRecordBatch {
public IterOutcome next() {
try {
driver.operatorContext().getStats().startProcessing();
- return driver.next();
+ lastOutcome = driver.next();
+ return lastOutcome;
+ } catch (Exception e) {
+ // mark batch as failed
+ lastOutcome = IterOutcome.STOP;
+ throw e;
} finally {
driver.operatorContext().getStats().stopProcessing();
}
@@ -158,4 +164,14 @@ public class OperatorRecordBatch implements CloseableRecordBatch {
public VectorContainer getContainer() {
return batchAccessor.getOutgoingContainer();
}
+
+ @Override
+ public boolean hasFailed() {
+ return lastOutcome == IterOutcome.STOP;
+ }
+
+ @Override
+ public void dump() {
+ logger.error("OperatorRecordBatch[batchAccessor={}, lastOutcome={}]", batchAccessor, lastOutcome);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index fc49d43..fcbb10e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -213,4 +213,8 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
incoming.kill(sendUpstream);
}
+ @Override
+ public void dump() {
+ logger.error("SortBatch[popConfig={}, container={}, sorter={}, schema={}]", popConfig, container, sorter, schema);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
index da476cc..b3c6a7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
@@ -70,4 +70,8 @@ public abstract class SortTemplate implements Sorter, IndexedSortable{
public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch outgoing);
public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex);
+ @Override
+ public String toString() {
+ return "SortTemplate[vector4=" + vector4 + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 1471d5e..a8c3622 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -92,4 +92,9 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
public WritableBatch getWritableBatch() {
return WritableBatch.get(this);
}
+
+ @Override
+ public void dump() {
+ logger.error("RemovingRecordBatch[container={}, state={}, copier={}]", container, state, copier);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 50cb26b..d56f848 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-/* TraceRecordBatch contains value vectors which are exactly the same
+/** TraceRecordBatch contains value vectors which are exactly the same
* as the incoming record batch's value vectors. If the incoming
* record batch has a selection vector (type 2) then TraceRecordBatch
* will also contain a selection vector.
@@ -171,4 +171,8 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
super.close();
}
+ @Override
+ public void dump() {
+ logger.error("TraceRecordBatch[filename={}, logLocation={}, selectionVector={}]", getFileName(), logLocation, sv);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 7e16d6a..e83fddf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -440,4 +440,9 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
}
+ @Override
+ public void dump() {
+ logger.error("UnionAllRecordBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, "
+ + "recordCount={}]", container, left, right, leftUpstream, rightUpstream, recordCount);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
index 285481f..508999f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
@@ -191,4 +191,14 @@ public class UnnestImpl implements Unnest {
transfers = null;
}
}
+
+ @Override
+ public String toString() {
+ return "UnnestImpl[svMode=" + svMode
+ + ", outputLimit=" + outputLimit
+ + ", valueIndex=" + valueIndex
+ + ", innerValueIndex=" + innerValueIndex
+ + ", runningInnerValueIndex=" + runningInnerValueIndex
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 5f63967..1c8336d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -241,7 +241,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
}
}
- @Override
+ @Override
public VectorContainer getOutgoingContainer() {
return this.container;
}
@@ -446,4 +446,9 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
super.close();
}
+ @Override
+ public void dump() {
+ logger.error("UnnestRecordBatch[container={}, unnest={}, hasRemainder={}, remainderIndex={}, " +
+ "unnestFieldMetadata={}]", container, unnest, hasRemainder, remainderIndex, unnestFieldMetadata);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 8cdc0a1..a6ebef0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -64,6 +64,9 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
private boolean first = true;
private final UnorderedReceiver config;
private final OperatorContext oContext;
+ // Represents last outcome of next(). If an Exception is thrown
+ // during the method's execution a value IterOutcome.STOP will be assigned.
+ private IterOutcome lastOutcome;
public enum Metric implements MetricDef {
BYTES_RECEIVED,
@@ -156,7 +159,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
public IterOutcome next() {
batchLoader.resetRecordCount();
stats.startProcessing();
- try{
+ try {
RawFragmentBatch batch;
try {
stats.startWait();
@@ -174,15 +177,17 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
first = false;
if (batch == null) {
+ lastOutcome = IterOutcome.NONE;
batchLoader.zero();
if (!context.getExecutorState().shouldContinue()) {
- return IterOutcome.STOP;
+ lastOutcome = IterOutcome.STOP;
}
- return IterOutcome.NONE;
+ return lastOutcome;
}
if (context.getAllocator().isOverLimit()) {
- return IterOutcome.OUT_OF_MEMORY;
+ lastOutcome = IterOutcome.OUT_OF_MEMORY;
+ return lastOutcome;
}
final RecordBatchDef rbd = batch.getHeader().getDef();
@@ -195,14 +200,19 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
if(schemaChanged) {
this.schema = batchLoader.getSchema();
stats.batchReceived(0, rbd.getRecordCount(), true);
- return IterOutcome.OK_NEW_SCHEMA;
+ lastOutcome = IterOutcome.OK_NEW_SCHEMA;
} else {
stats.batchReceived(0, rbd.getRecordCount(), false);
- return IterOutcome.OK;
+ lastOutcome = IterOutcome.OK;
}
- } catch(SchemaChangeException | IOException ex) {
+ return lastOutcome;
+ } catch (SchemaChangeException | IOException ex) {
context.getExecutorState().fail(ex);
- return IterOutcome.STOP;
+ lastOutcome = IterOutcome.STOP;
+ return lastOutcome;
+ } catch (Exception e) {
+ lastOutcome = IterOutcome.STOP;
+ throw e;
} finally {
stats.stopProcessing();
}
@@ -270,4 +280,14 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
}
}
}
+
+ @Override
+ public void dump() {
+ logger.error("UnorderedReceiverBatch[batchLoader={}, schema={}]", batchLoader, schema);
+ }
+
+ @Override
+ public boolean hasFailed() {
+ return lastOutcome == IterOutcome.STOP;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 88f4c7d..1ea3895 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -322,8 +322,7 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
}
return batchState;
- }
- catch (RuntimeException | Error e) {
+ } catch (RuntimeException | Error e) {
exceptionState = e;
logger.trace("[#{}, on {}]: incoming next() exception: ({} ->) {}",
instNum, batchTypeName, prevBatchState, exceptionState,
@@ -366,4 +365,14 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
public RecordBatch getIncoming() { return incoming; }
+ @Override
+ public boolean hasFailed() {
+ return exceptionState != null || batchState == STOP;
+ }
+
+ @Override
+ public void dump() {
+ logger.error("IteratorValidatorBatchIterator[container={}, instNum={}, batchTypeName={}, lastSchema={}, "
+ + "lastNewSchema={}]", getContainer(), instNum, batchTypeName, lastSchema, lastNewSchema);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
index 5cd6de7..1e477ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
@@ -295,6 +295,17 @@ public abstract class FrameSupportTemplate implements WindowFramer {
internal.clear();
}
+ @Override
+ public String toString() {
+ return "FrameSupportTemplate[internal=" + internal
+ + ", outputCount=" + outputCount
+ + ", current=" + current
+ + ", frameLastRow=" + frameLastRow
+ + ", remainingRows=" + remainingRows
+ + ", partialPartition=" + partialPartition
+ + "]";
+ }
+
/**
* called once for each peer row of the current frame.
* @param index of row to aggregate
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
index 55c27c1..cc7a04d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
@@ -298,6 +298,17 @@ public abstract class NoFrameSupportTemplate implements WindowFramer {
internal.clear();
}
+ @Override
+ public String toString() {
+ return "FrameSupportTemplate[internal=" + internal
+ + ", outputCount=" + outputCount
+ + ", current=" + current
+ + ", requireFullPartition=" + requireFullPartition
+ + ", partition=" + partition
+ + "]";
+ }
+
+
/**
* called once for each row after we evaluate all peer rows. Used to write a value in the row
*
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
index 7d98724..a9baea9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
@@ -106,4 +106,9 @@ public class WindowDataBatch implements VectorAccessible {
public void clear() {
container.clear();
}
+
+ @Override
+ public String toString() {
+ return "WindowDataBatch[container=" + container + ", recordCount=" + recordCount + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index a372a3c..59e84ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.physical.impl.window;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
@@ -426,4 +427,10 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
public int getRecordCount() {
return framers[0].getOutputCount();
}
+
+ @Override
+ public void dump() {
+ logger.error("WindowFrameRecordBatch[container={}, popConfig={}, framers={}, schema={}]",
+ container, popConfig, Arrays.toString(framers), schema);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 0edf974..262a241 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -824,4 +824,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
incoming.kill(sendUpstream);
}
+ @Override
+ public void dump() {
+ logger.error("ExternalSortBatch[schema={}, sorter={}, mSorter={}, container={}]", schema, sorter, mSorter, container);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 5ebec50..9b10d43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -206,4 +206,13 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
public abstract int doEval(@Named("leftIndex") int leftIndex,
@Named("rightIndex") int rightIndex)
throws SchemaChangeException;
+
+ @Override
+ public String toString() {
+ return "MSortTemplate[vector4=" + vector4
+ + ", aux=" + aux
+ + ", runStarts=" + runStarts
+ + ", desiredRecordBatchCount=" + desiredRecordBatchCount
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
index de783df..57d2ec3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
@@ -83,4 +83,9 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In
public abstract int doEval(@Named("leftIndex") char leftIndex,
@Named("rightIndex") char rightIndex)
throws SchemaChangeException;
+
+ @Override
+ public String toString() {
+ return "SinglebatchSorterTemplate[vector2=" + vector2 + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 7db4d3b..8fc4a74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -698,4 +698,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
SpilledRuns spilledRuns = new SpilledRuns(oContext, spillSet, copierHolder);
return new SortImpl(oContext, sortConfig, spilledRuns, outputWrapperContainer);
}
+
+ @Override
+ public void dump() {
+ logger.error("ExternalSortBatch[schema={}, sortState={}, sortConfig={}, outputWrapperContainer={}, "
+ + "outputSV4={}, container={}]",
+ schema, sortState, sortConfig, outputWrapperContainer, outputSV4, container);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
index e592ccb..fbcf1ae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
@@ -137,6 +137,15 @@ public class SortConfig {
mergeBatchSize, mSortBatchSize);
}
+ @Override
+ public String toString() {
+ return "SortConfig[spillFileSize=" + spillFileSize
+ + ", spillBatchSize=" + spillBatchSize
+ + ", mergeBatchSize=" + mergeBatchSize
+ + ", mSortBatchSize=" + mSortBatchSize
+ + "]";
+ }
+
public long maxMemory() { return maxMemory; }
public int mergeLimit() { return mergeLimit; }
public long spillFileSize() { return spillFileSize; }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
index ac30e94..79294cc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
@@ -592,4 +592,12 @@ public class SortImpl {
throw ex;
}
}
+
+ @Override
+ public String toString() {
+ return "SortImpl[config=" + config
+ + ", outputBatch=" + outputBatch
+ + ", sizer=" + sizer
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index c38de2d..362ea29 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -47,6 +47,10 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
protected final boolean unionTypeEnabled;
protected BatchState state;
+ // Represents last outcome of next(). If an Exception is thrown
+ // during the method's execution a value IterOutcome.STOP will be assigned.
+ private IterOutcome lastOutcome;
+
protected AbstractRecordBatch(final T popConfig, final FragmentContext context) throws OutOfMemoryException {
this(popConfig, context, true, context.newOperatorContext(popConfig));
}
@@ -113,7 +117,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
}
public final IterOutcome next(final int inputIndex, final RecordBatch b){
- IterOutcome next = null;
+ IterOutcome next;
stats.stopProcessing();
try{
if (!context.getExecutorState().shouldContinue()) {
@@ -132,15 +136,15 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
}
switch(next) {
- case OK_NEW_SCHEMA:
- stats.batchReceived(inputIndex, b.getRecordCount(), true);
- break;
- case OK:
- case EMIT:
- stats.batchReceived(inputIndex, b.getRecordCount(), false);
- break;
- default:
- break;
+ case OK_NEW_SCHEMA:
+ stats.batchReceived(inputIndex, b.getRecordCount(), true);
+ break;
+ case OK:
+ case EMIT:
+ stats.batchReceived(inputIndex, b.getRecordCount(), false);
+ break;
+ default:
+ break;
}
return next;
@@ -155,27 +159,38 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
buildSchema();
switch (state) {
case DONE:
- return IterOutcome.NONE;
+ lastOutcome = IterOutcome.NONE;
+ break;
case OUT_OF_MEMORY:
// because we don't support schema changes, it is safe to fail the query right away
context.getExecutorState().fail(UserException.memoryError()
.build(logger));
// FALL-THROUGH
case STOP:
- return IterOutcome.STOP;
+ lastOutcome = IterOutcome.STOP;
+ break;
default:
state = BatchState.FIRST;
- return IterOutcome.OK_NEW_SCHEMA;
+ lastOutcome = IterOutcome.OK_NEW_SCHEMA;
+ break;
}
+ break;
}
case DONE: {
- return IterOutcome.NONE;
+ lastOutcome = IterOutcome.NONE;
+ break;
}
default:
- return innerNext();
+ lastOutcome = innerNext();
+ break;
}
+ return lastOutcome;
} catch (final SchemaChangeException e) {
+ lastOutcome = IterOutcome.STOP;
throw new DrillRuntimeException(e);
+ } catch (Exception e) {
+ lastOutcome = IterOutcome.STOP;
+ throw e;
} finally {
stats.stopProcessing();
}
@@ -245,6 +260,11 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
return container;
}
+ @Override
+ public boolean hasFailed() {
+ return lastOutcome == IterOutcome.STOP;
+ }
+
public RecordBatchStatsContext getRecordBatchStatsContext() {
return batchStatsContext;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
index cb3a61c..dda4ef5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
@@ -59,6 +59,5 @@ public abstract class AbstractTableFunctionRecordBatch<T extends PhysicalOperato
setIncoming(incoming.getIncoming());
lateral = incoming;
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index c65827c..7473c8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -320,4 +320,19 @@ public interface RecordBatch extends VectorAccessible {
* buffers.
*/
WritableBatch getWritableBatch();
+
+ /**
+ * Perform dump of this batch's state to logs.
+ */
+ void dump();
+
+ /**
+ * Use this method to see if the batch has failed. Currently used when logging {@code RecordBatch}'s
+ * state using {@link #dump()} method.
+ *
+ * @return {@code true} if either {@link org.apache.drill.exec.record.RecordBatch.IterOutcome#STOP}
+ * was returned by its or child's {@link #next()} invocation or there was an {@code Exception} thrown
+ * during execution of the batch; {@code false} otherwise
+ */
+ boolean hasFailed();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index e841f2f..696d6db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -292,4 +292,12 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
container.clear();
resetRecordCount();
}
+
+ @Override
+ public String toString() {
+ return "RecordBatchLoader[container=" + container
+ + ", valueCount=" + valueCount
+ + ", schema=" + schema
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
index 47b11a6..04cf32e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
@@ -367,4 +367,18 @@ public class RecordIterator implements VectorAccessible {
clear();
clearInflightBatches();
}
+
+ @Override
+ public String toString() {
+ return "RecordIterator[outerPosition=" + outerPosition
+ + ", innerPosition=" + innerPosition
+ + ", innerRecordCount=" + innerRecordCount
+ + ", totalRecordCount=" + totalRecordCount
+ + ", startBatchPosition=" + startBatchPosition
+ + ", markedInnerPosition" + markedInnerPosition
+ + ", markedOuterPosition=" + markedOuterPosition
+ + ", lastOutcome=" + lastOutcome
+ + ", inputIndex=" + inputIndex
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
index 9dfa129..e4278ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
@@ -107,4 +107,14 @@ public class SchemalessBatch implements CloseableRecordBatch {
@Override
public VectorContainer getContainer() { return null; }
+
+ @Override
+ public boolean hasFailed() {
+ return false;
+ }
+
+ @Override
+ public void dump() {
+ logger.error("SchemalessBatch[]");
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
index 4063e55..c588f25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
@@ -28,6 +28,9 @@ import java.util.Iterator;
* Wrap a VectorContainer into a record batch.
*/
public class SimpleRecordBatch implements RecordBatch {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleRecordBatch.class);
+
private VectorContainer container;
private FragmentContext context;
@@ -99,4 +102,14 @@ public class SimpleRecordBatch implements RecordBatch {
public VectorContainer getContainer() {
return container;
}
+
+ @Override
+ public void dump() {
+ logger.error("SimpleRecordBatch[container=" + container + "]");
+ }
+
+ @Override
+ public boolean hasFailed() {
+ return false;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index a0b47ed..4f4f88d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -161,4 +161,13 @@ public class SelectionVector4 implements AutoCloseable {
public void close() {
clear();
}
+
+ @Override
+ public String toString() {
+ return "SelectionVector4[data=" + data
+ + ", recordCount=" + recordCount
+ + ", start=" + start
+ + ", length=" + length
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
index 7accdc4..9314da6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
@@ -104,5 +104,4 @@ public abstract class AbstractRecordReader implements RecordReader {
protected List<SchemaPath> getDefaultColumnsToRead() {
return GroupScan.ALL_COLUMNS;
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
index a0dda01..edd91d1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
@@ -26,8 +26,8 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.vector.ValueVector;
public interface RecordReader extends AutoCloseable {
- public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
- public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
+ long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024;
+ long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000;
/**
* Configure the RecordReader with the provided schema and the record batch that should be written to.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
index 68c3c36..31c0103 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
@@ -222,4 +222,9 @@ public class StorageStrategy {
fs.deleteOnExit(path);
}
}
-}
\ No newline at end of file
+
+ @Override
+ public String toString() {
+ return "StorageStrategy[umask=" + umask + ", deleteOnExist=" + deleteOnExit + "]";
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
index 6945fff..7668130 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -66,7 +66,7 @@ import org.joda.time.DateTimeConstants;
/**
* A RecordReader implementation for Avro data files.
*
- * @see RecordReader
+ * @see org.apache.drill.exec.store.RecordReader
*/
public class AvroRecordReader extends AbstractRecordReader {
@@ -398,4 +398,17 @@ public class AvroRecordReader extends AbstractRecordReader {
}
}
}
+
+ @Override
+ public String toString() {
+ long currentPosition = -1L;
+ try {
+ currentPosition = reader.tell();
+ } catch (IOException e) {
+ logger.trace("Unable to obtain reader position: " + e.getMessage());
+ }
+ return "AvroRecordReader[File=" + hadoop
+ + ", Position=" + currentPosition
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
index 5d9e105..2580010 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.expr.holders.BigIntHolder;
@@ -54,6 +53,8 @@ public class BsonRecordReader {
private final boolean readNumbersAsDouble;
protected DrillBuf workBuf;
private String currentFieldName;
+ // Used for error context
+ private BsonReader reader;
public BsonRecordReader(DrillBuf managedBuf, boolean allTextMode, boolean readNumbersAsDouble) {
this(managedBuf, GroupScan.ALL_COLUMNS, readNumbersAsDouble);
@@ -67,6 +68,7 @@ public class BsonRecordReader {
}
public void write(ComplexWriter writer, BsonReader reader) throws IOException {
+ this.reader = reader;
reader.readStartDocument();
BsonType readBsonType = reader.getCurrentBsonType();
switch (readBsonType) {
@@ -364,17 +366,20 @@ public class BsonRecordReader {
}
}
- public UserException.Builder getExceptionWithContext(UserException.Builder exceptionBuilder, String field,
- String msg, Object... args) {
- return null;
- }
-
- public UserException.Builder getExceptionWithContext(Throwable exception, String field, String msg, Object... args) {
- return null;
- }
-
private void ensure(final int length) {
workBuf = workBuf.reallocIfNeeded(length);
}
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("BsonRecordReader[");
+ if (reader != null) {
+ sb.append("Name=")
+ .append(reader.getCurrentName())
+ .append(", Type=")
+ .append(reader.getCurrentBsonType());
+ }
+ sb.append(']');
+ return sb.toString();
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
index 7a4e4a2..379e2c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
@@ -103,4 +103,12 @@ public class EasyWriter extends AbstractWriter {
public int getOperatorType() {
return formatPlugin.getWriterOperatorType();
}
+
+ @Override
+ public String toString() {
+ return "EasyWriter[location=" + location
+ + ", storageStrategy=" + getStorageStrategy()
+ + ", partitionColumns=" + partitionColumns
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 4b8bbf8..62ace66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -104,7 +104,7 @@ public class JSONRecordReader extends AbstractRecordReader {
"One of inputPath or embeddedContent must be set but not both."
);
- if(inputPath != null) {
+ if (inputPath != null) {
this.hadoopPath = new Path(inputPath);
} else {
this.embeddedContent = embeddedContent;
@@ -126,9 +126,11 @@ public class JSONRecordReader extends AbstractRecordReader {
public String toString() {
return super.toString()
+ "[hadoopPath = " + hadoopPath
+ + ", currentRecord=" + currentRecordNumberInFile()
+ + ", jsonReader=" + jsonReader
+ ", recordCount = " + recordCount
+ ", parseErrorCount = " + parseErrorCount
- + ", runningRecordCount = " + runningRecordCount + ", ...]";
+ + ", runningRecordCount = " + runningRecordCount + ", ...]";
}
@Override
@@ -162,9 +164,9 @@ public class JSONRecordReader extends AbstractRecordReader {
}
private void setupParser() throws IOException {
- if(hadoopPath != null){
+ if (hadoopPath != null) {
jsonReader.setSource(stream);
- }else{
+ } else {
jsonReader.setSource(embeddedContent);
}
jsonReader.setIgnoreJSONParseErrors(skipMalformedJSONRecords);
@@ -182,7 +184,7 @@ public class JSONRecordReader extends AbstractRecordReader {
}
UserException.Builder exceptionBuilder = UserException.dataReadError(e)
- .message("%s - %s", suffix, message);
+ .message("%s - %s", suffix, message);
if (columnNr > 0) {
exceptionBuilder.pushContext("Column ", columnNr);
}
@@ -205,36 +207,32 @@ public class JSONRecordReader extends AbstractRecordReader {
writer.reset();
recordCount = 0;
parseErrorCount = 0;
- if(write == ReadState.JSON_RECORD_PARSE_EOF_ERROR){
+ if (write == ReadState.JSON_RECORD_PARSE_EOF_ERROR) {
return recordCount;
}
- outside: while(recordCount < DEFAULT_ROWS_PER_BATCH){
- try{
+ while (recordCount < DEFAULT_ROWS_PER_BATCH) {
+ try {
writer.setPosition(recordCount);
write = jsonReader.write(writer);
- if(write == ReadState.WRITE_SUCCEED){
+ if (write == ReadState.WRITE_SUCCEED) {
recordCount++;
- }
- else if(write == ReadState.JSON_RECORD_PARSE_ERROR || write == ReadState.JSON_RECORD_PARSE_EOF_ERROR){
- if(skipMalformedJSONRecords == false){
- handleAndRaise("Error parsing JSON", new Exception(hadoopPath.getName() + " : line nos :" + (recordCount+1)));
+ } else if (write == ReadState.JSON_RECORD_PARSE_ERROR || write == ReadState.JSON_RECORD_PARSE_EOF_ERROR) {
+ if (!skipMalformedJSONRecords) {
+ handleAndRaise("Error parsing JSON", new Exception());
}
++parseErrorCount;
- if(printSkippedMalformedJSONRecordLineNumber){
- logger.debug("Error parsing JSON in " + hadoopPath.getName() + " : line nos :" + (recordCount+parseErrorCount));
+ if (printSkippedMalformedJSONRecordLineNumber) {
+ logger.debug("Error parsing JSON in " + hadoopPath.getName() + " : line nos :" + (recordCount + parseErrorCount));
}
- if(write == ReadState.JSON_RECORD_PARSE_EOF_ERROR){
- break outside;
+ if (write == ReadState.JSON_RECORD_PARSE_EOF_ERROR) {
+ break;
}
+ } else {
+ break;
}
- else{
- break outside;
- }
+ } catch (IOException ex) {
+ handleAndRaise("Error parsing JSON", ex);
}
- catch(IOException ex)
- {
- handleAndRaise("Error parsing JSON", ex);
- }
}
// Skip empty json file with 0 row.
// Only when data source has > 0 row, ensure the batch has one field.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
index fba80e5..adab033 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
@@ -27,7 +27,7 @@ import com.fasterxml.jackson.databind.JsonNode;
public interface JsonProcessor {
- public static enum ReadState {
+ enum ReadState {
END_OF_STREAM,
JSON_RECORD_PARSE_ERROR,
JSON_RECORD_PARSE_EOF_ERROR,
@@ -41,17 +41,11 @@ public interface JsonProcessor {
void ensureAtLeastOneField(BaseWriter.ComplexWriter writer);
- public UserException.Builder getExceptionWithContext(UserException.Builder exceptionBuilder,
- String field,
- String msg,
- Object... args);
+ UserException.Builder getExceptionWithContext(UserException.Builder exceptionBuilder, String message);
- public UserException.Builder getExceptionWithContext(Throwable exception,
- String field,
- String msg,
- Object... args);
+ UserException.Builder getExceptionWithContext(Throwable exception, String message);
- public boolean ignoreJSONParseError();
+ boolean ignoreJSONParseError();
- public void setIgnoreJSONParseErrors(boolean ignoreJSONParseErrors);
+ void setIgnoreJSONParseErrors(boolean ignoreJSONParseErrors);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
index aaa74ae..48a1464 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
import org.apache.drill.exec.store.easy.json.JsonProcessor;
+import com.fasterxml.jackson.core.JsonLocation;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonNode;
@@ -49,8 +50,12 @@ public abstract class BaseJsonProcessor implements JsonProcessor {
protected JsonParser parser;
protected DrillBuf workBuf;
- protected JsonToken lastSeenJsonToken = null;
- boolean ignoreJSONParseErrors = false; // default False
+ protected JsonToken lastSeenJsonToken;
+ boolean ignoreJSONParseErrors;
+ /**
+ * The name of the current field being parsed. For Error messages.
+ */
+ protected String currentFieldName;
/**
*
@@ -90,26 +95,31 @@ public abstract class BaseJsonProcessor implements JsonProcessor {
}
@Override
- public UserException.Builder getExceptionWithContext(
- UserException.Builder exceptionBuilder, String field, String msg,
- Object... args) {
- if (msg != null) {
- exceptionBuilder.message(msg, args);
- }
- if (field != null) {
- exceptionBuilder.pushContext("Field ", field);
+ public String toString() {
+ JsonLocation location = parser.getCurrentLocation();
+ return getClass().getSimpleName() + "[Line=" + location.getLineNr()
+ + ", Column=" + (location.getColumnNr() + 1)
+ + ", Field=" + getCurrentField()
+ + "]";
+ }
+
+ @Override
+ public UserException.Builder getExceptionWithContext(UserException.Builder builder, String message) {
+ builder.message(message);
+ JsonLocation location = parser.getCurrentLocation();
+ builder.addContext("Line", location.getLineNr())
+ .addContext("Column", location.getColumnNr() + 1);
+ String fieldName = getCurrentField();
+ if (fieldName != null) {
+ builder.addContext("Field", fieldName);
}
- exceptionBuilder.pushContext("Column ",
- parser.getCurrentLocation().getColumnNr() + 1).pushContext("Line ",
- parser.getCurrentLocation().getLineNr());
- return exceptionBuilder;
+ return builder;
}
@Override
- public UserException.Builder getExceptionWithContext(Throwable e,
- String field, String msg, Object... args) {
+ public UserException.Builder getExceptionWithContext(Throwable e, String message) {
UserException.Builder exceptionBuilder = UserException.dataReadError(e);
- return getExceptionWithContext(exceptionBuilder, field, msg, args);
+ return getExceptionWithContext(exceptionBuilder, message);
}
/*
@@ -138,4 +148,8 @@ public abstract class BaseJsonProcessor implements JsonProcessor {
}
return JsonExceptionProcessingState.PROC_SUCCEED;
}
+
+ protected String getCurrentField() {
+ return currentFieldName;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
index a5e9f1a..0f92ec5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
@@ -23,8 +23,6 @@ import com.fasterxml.jackson.core.JsonToken;
import io.netty.buffer.DrillBuf;
-import org.apache.drill.exec.store.easy.json.JsonProcessor.ReadState;
-import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor.JsonExceptionProcessingState;
import org.apache.drill.exec.vector.complex.writer.BaseWriter;
public class CountingJsonReader extends BaseJsonProcessor {
@@ -41,18 +39,14 @@ public class CountingJsonReader extends BaseJsonProcessor {
token = parser.nextToken();
}
lastSeenJsonToken = null;
+ if (token == JsonToken.FIELD_NAME) {
+ currentFieldName = parser.getText();
+ }
if (!parser.hasCurrentToken()) {
return ReadState.END_OF_STREAM;
} else if (token != JsonToken.START_OBJECT) {
throw new com.fasterxml.jackson.core.JsonParseException(
- parser,
- String
- .format(
- "Cannot read from the middle of a record. Current token was %s ",
- token));
- // throw new
- // IllegalStateException(String.format("Cannot read from the middle of a record. Current token was %s",
- // token));
+ parser, String.format("Cannot read from the middle of a record. Current token was %s ", token));
}
writer.rootAsMap().bit("count").writeBit(1);
parser.skipChildren();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
index 3d88b1a..549df82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
@@ -163,4 +163,19 @@ public class SequenceFileRecordReader extends AbstractRecordReader {
logger.warn("Exception closing reader: {}", e);
}
}
-}
\ No newline at end of file
+
+ @Override
+ public String toString() {
+ long position = -1L;
+ try {
+ if (reader != null) {
+ position = reader.getPos();
+ }
+ } catch (IOException e) {
+ logger.trace("Unable to obtain reader position: " + e.getMessage());
+ }
+ return "SequenceFileRecordReader[File=" + split.getPath()
+ + ", Position=" + position
+ + "]";
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
index 9a1d486..7aa9b04 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
@@ -216,7 +216,7 @@ public class CompliantTextRecordReader extends AbstractRecordReader {
} catch (IOException | TextParsingException e) {
throw UserException.dataReadError(e)
.addContext("Failure while reading file %s. Happened at or shortly before byte position %d.",
- split.getPath(), reader.getPos())
+ split.getPath(), reader.getPos())
.build(logger);
}
}
@@ -248,4 +248,11 @@ public class CompliantTextRecordReader extends AbstractRecordReader {
logger.warn("Exception while closing stream.", e);
}
}
+
+ @Override
+ public String toString() {
+ return "CompliantTextRecordReader[File=" + split.getPath()
+ + ", reader=" + reader
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
index a181d42..7a9ed46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
@@ -500,4 +500,12 @@ final class TextReader {
input.close();
}
+ @Override
+ public String toString() {
+ return "TextReader[Line=" + context.currentLine()
+ + ", Column=" + context.currentChar()
+ + ", Record=" + context.currentRecord()
+ + ", Byte pos=" + getPos()
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
index 18437df..f43bb88 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
@@ -242,6 +242,14 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatPlugin.
}
}
+ @Override
+ public String toString() {
+ return "HttpdLogRecordReader[Path=" + work.getPath()
+ + ", Start=" + work.getStart()
+ + ", Length=" + work.getLength()
+ + ", Line=" + lineNumber.get()
+ + "]";
+ }
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java
index 91f8b99..2a4b4fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java
@@ -261,7 +261,7 @@ public class ImageRecordReader extends AbstractRecordReader {
}
private void processXmpDirectory(final MapWriter writer, final XmpDirectory directory) {
- HashSet<String> listItems = new HashSet();
+ HashSet<String> listItems = new HashSet<>();
XMPMeta xmpMeta = directory.getXMPMeta();
if (xmpMeta != null) {
try {
@@ -490,4 +490,9 @@ public class ImageRecordReader extends AbstractRecordReader {
metadataStream.close();
}
}
+
+ @Override
+ public String toString() {
+ return "ImageRecordReader[Path=" + hadoopPath.toUri().getPath() + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
index 56bc1cc..e5d1dc4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
@@ -761,4 +761,11 @@ public class LogRecordReader extends AbstractRecordReader {
reader = null;
}
}
-}
\ No newline at end of file
+
+ @Override
+ public String toString() {
+ return "LogRecordReader[File=" + fileWork.getPath()
+ + ", Line=" + rowIndex
+ + "]";
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
index 1cd393b..aea3218 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
@@ -121,4 +121,11 @@ public class ParquetWriter extends AbstractWriter {
return CoreOperatorType.PARQUET_WRITER_VALUE;
}
+ @Override
+ public String toString() {
+ return "ParquetWriter[location=" + location
+ + ", storageStrategy=" + getStorageStrategy()
+ + ", partitionColumns=" + partitionColumns
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 2ada17d..17cf8c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -348,4 +348,14 @@ public class ParquetRecordReader extends AbstractRecordReader {
return (int) Math.min(numRecordsToRead, footer.getBlocks().get(rowGroupIndex).getRowCount());
}
}
+
+ @Override
+ public String toString() {
+ return "ParquetRecordReader[File=" + hadoopPath.toUri()
+ + ", Row group index=" + rowGroupIndex
+ + ", Records in row group=" + footer.getBlocks().get(rowGroupIndex).getRowCount()
+ + ", Total records read=" + (readState != null ? readState.recordsRead() : -1)
+ + ", Metadata" + footer
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 0c69d6d..7108ca6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -344,4 +344,9 @@ public class DrillParquetReader extends AbstractRecordReader {
this.type = type;
}
}
+
+ @Override
+ public String toString() {
+ return "DrillParquetReader[pageReadStore=" + pageReadStore + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
index 794687f..d688f3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
@@ -396,4 +396,9 @@ public class PcapRecordReader extends AbstractRecordReader {
.setSafe(count, value, 0, value.remaining());
}
}
+
+ @Override
+ public String toString() {
+ return "PcapRecordReader[File=" + pathToFile.toUri() + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index d2cd9a8..0db17fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -232,4 +232,13 @@ public class DrillTextRecordReader extends AbstractRecordReader {
logger.warn("Exception closing reader: {}", e);
}
}
+
+ @Override
+ public String toString() {
+ return "DrillTextRecordReader[File=" + split.getPath()
+ + ", Record=" + (totalRecordsRead + 1)
+ + ", Start=" + split.getStart()
+ + ", Length=" + split.getLength()
+ + "]";
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 2810c04..aaa9806 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -70,10 +70,6 @@ public class JsonReader extends BaseJsonProcessor {
* outer list.
*/
private boolean inOuterList;
- /**
- * The name of the current field being parsed. For Error messages.
- */
- private String currentFieldName;
private FieldSelection selection;
@@ -214,10 +210,9 @@ public class JsonReader extends BaseJsonProcessor {
case WRITE_SUCCEED:
break;
default:
- throw getExceptionWithContext(UserException.dataReadError(),
- currentFieldName, null).message(
- "Failure while reading JSON. (Got an invalid read state %s )",
- readState.toString()).build(logger);
+ throw getExceptionWithContext(UserException.dataReadError(), null).message(
+ "Failure while reading JSON. (Got an invalid read state %s )", readState.toString())
+ .build(logger);
}
} catch (com.fasterxml.jackson.core.JsonParseException ex) {
if (ignoreJSONParseError()) {
@@ -236,13 +231,10 @@ public class JsonReader extends BaseJsonProcessor {
private void confirmLast() throws IOException {
parser.nextToken();
if (!parser.isClosed()) {
- throw getExceptionWithContext(UserException.dataReadError(),
- currentFieldName, null)
- .message(
- "Drill attempted to unwrap a toplevel list "
- + "in your document. However, it appears that there is trailing content after this top level list. Drill only "
- + "supports querying a set of distinct maps or a single json array with multiple inner maps.")
- .build(logger);
+ String message = "Drill attempted to unwrap a toplevel list in your document. "
+ + "However, it appears that there is trailing content after this top level list. Drill only "
+ + "supports querying a set of distinct maps or a single json array with multiple inner maps.";
+ throw getExceptionWithContext(UserException.dataReadError(), message).build(logger);
}
}
@@ -255,11 +247,9 @@ public class JsonReader extends BaseJsonProcessor {
break;
case START_ARRAY:
if (inOuterList) {
- throw getExceptionWithContext(UserException.dataReadError(),
- currentFieldName, null)
- .message(
- "The top level of your document must either be a single array of maps or a set "
- + "of white space delimited maps.").build(logger);
+ String message = "The top level of your document must either be a single array of maps or a set "
+ + "of white space delimited maps.";
+ throw getExceptionWithContext(UserException.dataReadError(), message).build(logger);
}
if (skipOuterList) {
@@ -268,11 +258,9 @@ public class JsonReader extends BaseJsonProcessor {
inOuterList = true;
writeDataSwitch(writer.rootAsMap());
} else {
- throw getExceptionWithContext(UserException.dataReadError(),
- currentFieldName, null)
- .message(
- "The top level of your document must either be a single array of maps or a set "
- + "of white space delimited maps.").build(logger);
+ String message = "The top level of your document must either be a single array of maps or a set "
+ + "of white space delimited maps.";
+ throw getExceptionWithContext(UserException.dataReadError(), message).build(logger);
}
} else {
@@ -285,17 +273,14 @@ public class JsonReader extends BaseJsonProcessor {
confirmLast();
return ReadState.END_OF_STREAM;
} else {
- throw getExceptionWithContext(UserException.dataReadError(),
- currentFieldName, null).message(
- "Failure while parsing JSON. Ran across unexpected %s.",
- JsonToken.END_ARRAY).build(logger);
+ throw getExceptionWithContext(UserException.dataReadError(), null).message(
+ "Failure while parsing JSON. Ran across unexpected %s.", JsonToken.END_ARRAY).build(logger);
}
case NOT_AVAILABLE:
return ReadState.END_OF_STREAM;
default:
- throw getExceptionWithContext(UserException.dataReadError(),
- currentFieldName, null)
+ throw getExceptionWithContext(UserException.dataReadError(), null)
.message(
"Failure while parsing JSON. Found token of [%s]. Drill currently only supports parsing "
+ "json strings that contain either lists or maps. The root object cannot be a scalar.",
@@ -412,9 +397,9 @@ public class JsonReader extends BaseJsonProcessor {
break;
default:
- throw getExceptionWithContext(UserException.dataReadError(),
- currentFieldName, null).message("Unexpected token %s",
- parser.getCurrentToken()).build(logger);
+ throw getExceptionWithContext(UserException.dataReadError(), null)
+ .message("Unexpected token %s", parser.getCurrentToken())
+ .build(logger);
}
}
@@ -478,8 +463,7 @@ public class JsonReader extends BaseJsonProcessor {
break;
default:
- throw getExceptionWithContext(UserException.dataReadError(),
- currentFieldName, null).message("Unexpected token %s",
+ throw getExceptionWithContext(UserException.dataReadError(), null).message("Unexpected token %s",
parser.getCurrentToken()).build(logger);
}
}
@@ -591,8 +575,7 @@ public class JsonReader extends BaseJsonProcessor {
.build(logger);
}
} catch (Exception e) {
- throw getExceptionWithContext(e, this.currentFieldName, null).build(
- logger);
+ throw getExceptionWithContext(e, null).build(logger);
}
}
list.endList();
@@ -637,8 +620,7 @@ public class JsonReader extends BaseJsonProcessor {
handleString(parser, list);
break;
default:
- throw getExceptionWithContext(UserException.dataReadError(),
- currentFieldName, null).message("Unexpected token %s",
+ throw getExceptionWithContext(UserException.dataReadError(), null).message("Unexpected token %s",
parser.getCurrentToken()).build(logger);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 7cb07eb..a9e9e62 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -207,6 +207,10 @@ public class FragmentExecutor implements Runnable {
}
private void cleanup(FragmentState state) {
+ if (fragmentState.get() == FragmentState.FAILED) {
+ root.dumpBatches();
+ }
+
closeOutResources();
updateState(state);
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
index 89731ff..4bb1a22 100644
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -269,4 +269,9 @@ public class ColumnChunkIncReadStore implements PageReadStore {
public long getRowCount() {
return rowCount;
}
+
+ @Override
+ public String toString() {
+ return "ColumnChunkIncReadStore[File=" + path.toUri() + "]";
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestOperatorDump.java b/exec/java-exec/src/test/java/org/apache/drill/TestOperatorDump.java
new file mode 100644
index 0000000..18ba61c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestOperatorDump.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.ConsoleAppender;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch;
+import org.apache.drill.exec.testing.Controls;
+import org.apache.drill.exec.testing.ControlsInjectionUtil;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.LogFixture;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestOperatorDump extends ClusterTest {
+
+ private static final String ENTRY_DUMP_COMPLETED = "Batch dump completed";
+ private static final String ENTRY_DUMP_STARTED = "Batch dump started";
+
+ private LogFixture logFixture;
+ private EventAwareContextAppender appender;
+
+ @BeforeClass
+ public static void setupFiles() {
+ dirTestWatcher.copyResourceToRoot(Paths.get("multilevel"));
+ }
+
+ @Before
+ public void setup() throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+ appender = new EventAwareContextAppender();
+ logFixture = LogFixture.builder()
+ .toConsole(appender, LogFixture.DEFAULT_CONSOLE_FORMAT)
+ .build();
+ startCluster(builder);
+ }
+
+ @After
+ public void tearDown(){
+ logFixture.close();
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testScanBatchChecked() throws Exception {
+ String exceptionDesc = "next-allocate";
+ final String controls = Controls.newBuilder()
+ .addException(ScanBatch.class, exceptionDesc, OutOfMemoryException.class, 0, 1)
+ .build();
+ ControlsInjectionUtil.setControls(client.client(), controls);
+ String query = "select * from dfs.`multilevel/parquet` limit 100";
+ try {
+ client.queryBuilder().sql(query).run();
+ } catch (UserRemoteException e) {
+ assertTrue(e.getMessage().contains(exceptionDesc));
+
+ String[] expectedEntries = new String[] {ENTRY_DUMP_STARTED, ENTRY_DUMP_COMPLETED};
+ validateContainsEntries(expectedEntries, ScanBatch.class.getName());
+ throw e;
+ }
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testExternalSortUnchecked() throws Exception {
+ Class<?> siteClass = org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch.class;
+ final String controls = Controls.newBuilder()
+ .addException(siteClass, ExternalSortBatch.INTERRUPTION_AFTER_SORT, RuntimeException.class)
+ .build();
+ ControlsInjectionUtil.setControls(client.client(), controls);
+ String query = "select n_name from cp.`tpch/lineitem.parquet` order by n_name";
+ try {
+ client.queryBuilder().sql(query).run();
+ } catch (UserRemoteException e) {
+ assertTrue(e.getMessage().contains(ExternalSortBatch.INTERRUPTION_AFTER_SORT));
+
+ String[] expectedEntries = new String[] {ENTRY_DUMP_STARTED, ENTRY_DUMP_COMPLETED};
+ validateContainsEntries(expectedEntries, ExternalSortBatch.class.getName());
+ throw e;
+ }
+ }
+
+ private void validateContainsEntries(String[] entries, String expectedClassName) {
+ if (entries == null) {
+ entries = new String[0];
+ }
+ List<String> messages = appender.getMessages();
+ List<String> entryList = new ArrayList<>(entries.length);
+ Collections.addAll(entryList, entries);
+ Iterator<String> it = entryList.iterator();
+ while (it.hasNext()) {
+ String entry = it.next();
+ for (String message : messages) {
+ if (message.contains(entry)) {
+ it.remove();
+ break;
+ }
+ }
+ }
+ assertTrue(String.format("Entries %s were not found in logs.", entryList), entryList.isEmpty());
+
+ Set<String> loggerNames = appender.getLoggerNames();
+ assertTrue(String.format("Entry for class %s was not found", expectedClassName),
+ loggerNames.contains(expectedClassName));
+ }
+
+ // ConsoleAppender which stores logged events
+ private static class EventAwareContextAppender extends ConsoleAppender<ILoggingEvent> {
+
+ private List<ILoggingEvent> events = new ArrayList<>();
+
+ @Override
+ protected void append(ILoggingEvent e) {
+ events.add(e);
+ }
+
+ List<String> getMessages() {
+ return events.stream()
+ .map(ILoggingEvent::getMessage)
+ .collect(Collectors.toList());
+ }
+
+ Set<String> getLoggerNames() {
+ return events.stream()
+ .map(ILoggingEvent::getLoggerName)
+ .collect(Collectors.toSet());
+ }
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 34d735e..94e0c0e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -288,6 +288,15 @@ public class MockRecordBatch implements CloseableRecordBatch {
this.limitWithUnnest = limitWithUnnest;
}
+ @Override
+ public boolean hasFailed() {
+ return false;
+ }
+
+ @Override
+ public void dump() {
+ }
+
public static class Builder {
private final List<RowSet> rowSets = new ArrayList<>();
private final List<IterOutcome> iterOutcomes = new ArrayList<>();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index 6d5b666..a176646 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -106,6 +106,11 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector> {
}
@Override
+ public void dumpBatches() {
+ screenRoot.dumpBatches();
+ }
+
+ @Override
public void close() throws Exception {
screenRoot.close();
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
index c7105f9..aefa28a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
@@ -162,6 +162,15 @@ public class MockLateralJoinBatch implements LateralContract, CloseableRecordBat
}
+ @Override
+ public boolean hasFailed() {
+ return false;
+ }
+
+ @Override
+ public void dump() {
+ }
+
@Override public int getRecordCount() {
return 0;
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
index 6f1e6e0..c05cdfd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
@@ -121,6 +121,15 @@ public class BloomFilterTest {
public Iterator<VectorWrapper<?>> iterator() {
return null;
}
+
+ @Override
+ public void dump() {
+ }
+
+ @Override
+ public boolean hasFailed() {
+ return false;
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
index b54b0b0..b62a188 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
@@ -86,6 +86,7 @@ public class LogFixture implements AutoCloseable {
private String consoleFormat = DEFAULT_CONSOLE_FORMAT;
private boolean logToConsole;
private List<LogSpec> loggers = new ArrayList<>();
+ private ConsoleAppender<ILoggingEvent> appender;
/**
* Send all enabled logging to the console (if not already configured.) Some
@@ -102,6 +103,11 @@ public class LogFixture implements AutoCloseable {
return this;
}
+ public LogFixtureBuilder toConsole(ConsoleAppender<ILoggingEvent> appender, String format) {
+ this.appender = appender;
+ return toConsole(format);
+ }
+
/**
* Send logging to the console using the defined format.
*
@@ -195,7 +201,7 @@ public class LogFixture implements AutoCloseable {
private void setupConsole(LogFixtureBuilder builder) {
drillLogger = (Logger)LoggerFactory.getLogger(DRILL_PACKAGE_NAME);
- if (drillLogger.getAppender("STDOUT") != null) {
+ if (builder.appender == null && drillLogger.getAppender("STDOUT") != null) {
return;
}
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
@@ -204,10 +210,10 @@ public class LogFixture implements AutoCloseable {
ple.setContext(lc);
ple.start();
- appender = new ConsoleAppender<>( );
+ appender = builder.appender == null ? new ConsoleAppender<>() : builder.appender;
appender.setContext(lc);
appender.setName("Console");
- appender.setEncoder( ple );
+ appender.setEncoder(ple);
appender.start();
Logger root = (Logger)LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);