You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ih...@apache.org on 2020/02/12 10:52:09 UTC
[drill] 05/05: DRILL-7576: Fail fast for operator errors
This is an automated email from the ASF dual-hosted git repository.
ihuzenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 09b805aea4dafe50555b23945302cf8f6c491de8
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Sat Feb 8 22:36:25 2020 -0800
DRILL-7576: Fail fast for operator errors
Converts operators to fail with a UserException rather than using
the STOP iterator status. The result is clearer error messages
and simpler code.
closes #1975
---
.../common/exceptions/DrillRuntimeException.java | 1 -
.../drill/maprdb/tests/json/TestSimpleJson.java | 18 +-
.../templates/StatisticsRecordWriterImpl.java | 26 +-
.../drill/exec/client/LoggingResultsListener.java | 11 +-
.../exec/physical/config/UnorderedReceiver.java | 1 -
.../drill/exec/physical/impl/BaseRootExec.java | 59 +++-
.../drill/exec/physical/impl/ImplCreator.java | 10 +-
.../apache/drill/exec/physical/impl/RootExec.java | 39 +--
.../apache/drill/exec/physical/impl/ScanBatch.java | 51 ++--
.../drill/exec/physical/impl/ScreenCreator.java | 68 ++---
.../physical/impl/StatisticsWriterRecordBatch.java | 116 ++++----
.../physical/impl/TopN/PriorityQueueTemplate.java | 23 +-
.../drill/exec/physical/impl/TopN/TopNBatch.java | 319 +++++++++++----------
.../exec/physical/impl/WriterRecordBatch.java | 111 +++----
.../exec/physical/impl/aggregate/HashAggBatch.java | 8 +-
.../impl/aggregate/SpilledRecordBatch.java | 20 +-
.../physical/impl/aggregate/StreamingAggBatch.java | 12 +-
.../physical/impl/filter/FilterRecordBatch.java | 3 +-
.../impl/filter/RuntimeFilterBatchCreator.java | 6 +-
.../impl/filter/RuntimeFilterRecordBatch.java | 41 +--
.../exec/physical/impl/join/HashJoinBatch.java | 14 +-
.../exec/physical/impl/join/MergeJoinBatch.java | 7 +-
.../exec/physical/impl/join/RowKeyJoinBatch.java | 10 +-
.../exec/physical/impl/limit/LimitRecordBatch.java | 7 +-
.../impl/limit/PartitionLimitRecordBatch.java | 3 +-
.../impl/mergereceiver/MergingRecordBatch.java | 166 +++++------
.../impl/metadata/MetadataControllerBatch.java | 73 ++---
.../impl/metadata/MetadataHandlerBatch.java | 10 +-
.../OrderedPartitionRecordBatch.java | 37 +--
.../partitionsender/PartitionSenderRootExec.java | 6 +-
.../impl/partitionsender/PartitionerDecorator.java | 18 +-
.../impl/producer/ProducerConsumerBatch.java | 38 +--
.../impl/protocol/OperatorRecordBatch.java | 5 -
.../RangePartitionRecordBatch.java | 2 +-
.../physical/impl/sort/SortRecordBatchBuilder.java | 7 +-
.../impl/statistics/StatisticsMergeBatch.java | 88 +++---
.../impl/svremover/RemovingRecordBatch.java | 3 +-
.../physical/impl/unnest/UnnestRecordBatch.java | 76 +++--
.../unorderedreceiver/UnorderedReceiverBatch.java | 15 +-
.../impl/unpivot/UnpivotMapsRecordBatch.java | 42 +--
.../physical/impl/window/FrameSupportTemplate.java | 41 ++-
.../impl/window/NoFrameSupportTemplate.java | 27 +-
.../impl/window/WindowFrameRecordBatch.java | 31 +-
.../exec/physical/impl/window/WindowFramer.java | 3 +-
.../drill/exec/record/AbstractRecordBatch.java | 10 +-
.../record/AbstractTableFunctionRecordBatch.java | 19 +-
.../exec/record/AbstractUnaryRecordBatch.java | 15 +-
.../drill/exec/record/RecordBatchLoader.java | 3 +-
.../drill/exec/record/SimpleRecordBatch.java | 4 +-
.../exec/record/VectorAccessibleUtilities.java | 7 +
.../exec/record/selection/SelectionVector4.java | 52 ++--
.../drill/exec/store}/StatisticsRecordWriter.java | 21 +-
.../exec/store/easy/json/JsonRecordWriter.java | 10 +-
.../easy/json/JsonStatisticsRecordWriter.java | 27 +-
.../drill/exec/work/fragment/FragmentExecutor.java | 36 ++-
.../drill/exec/physical/impl/MockRecordBatch.java | 5 +-
.../drill/exec/physical/impl/SimpleRootExec.java | 4 +-
.../exec/physical/impl/TestStackAnalyzer.java | 132 +++++++++
.../impl/partitionsender/TestPartitionSender.java | 32 ++-
.../unnest/TestUnnestWithLateralCorrectness.java | 55 ++--
.../drill/exec/server/TestDrillbitResilience.java | 11 +-
.../exec/store/parquet/ParquetResultListener.java | 17 +-
.../org/apache/drill/test/DrillTestWrapper.java | 48 ++--
.../java/org/apache/drill/test/QueryBuilder.java | 41 ++-
.../org/apache/drill/test/QueryRowSetIterator.java | 17 +-
.../org/apache/drill/jdbc/impl/DrillCursor.java | 141 +++++----
66 files changed, 1194 insertions(+), 1185 deletions(-)
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java b/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java
index c5c7170..35cf586 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java
@@ -18,7 +18,6 @@
package org.apache.drill.common.exceptions;
public class DrillRuntimeException extends RuntimeException {
-
private static final long serialVersionUID = -3796081521525479249L;
public DrillRuntimeException() {
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
index 9073858..a5be2ab 100644
--- a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
@@ -25,7 +25,6 @@ import java.io.InputStream;
import org.apache.drill.PlanTestBase;
import org.apache.drill.SingleRowListener;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
@@ -53,6 +52,7 @@ public class TestSimpleJson extends BaseJsonTest {
private static boolean tableCreated = false;
private static String tablePath;
+ @Override
protected String getTablePath() {
return tablePath;
}
@@ -148,16 +148,12 @@ public class TestSimpleJson extends BaseJsonTest {
SingleRowListener listener = new SingleRowListener() {
@Override
protected void rowArrived(QueryDataBatch result) {
- try {
- final RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
- loader.load(result.getHeader().getDef(), result.getData());
- StringBuilder sb = new StringBuilder();
- VectorUtil.appendVectorAccessibleContent(loader, sb, "|", false);
- loader.clear();
- queryResult.set("result", sb.toString());
- } catch (SchemaChangeException e) {
- queryResult.set("error", "true");
- }
+ final RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
+ loader.load(result.getHeader().getDef(), result.getData());
+ StringBuilder sb = new StringBuilder();
+ VectorUtil.appendVectorAccessibleContent(loader, sb, "|", false);
+ loader.clear();
+ queryResult.set("result", sb.toString());
}
};
testWithListener(QueryType.SQL, sql, listener);
diff --git a/exec/java-exec/src/main/codegen/templates/StatisticsRecordWriterImpl.java b/exec/java-exec/src/main/codegen/templates/StatisticsRecordWriterImpl.java
index f1c6962..3f8f4e2 100644
--- a/exec/java-exec/src/main/codegen/templates/StatisticsRecordWriterImpl.java
+++ b/exec/java-exec/src/main/codegen/templates/StatisticsRecordWriterImpl.java
@@ -83,21 +83,16 @@ public class StatisticsRecordWriterImpl {
}
}
- private void initFieldWriters() throws IOException {
+ private void initFieldWriters() {
fieldConverters = Lists.newArrayList();
- try {
- int fieldId = 0;
- for (VectorWrapper w : batch) {
- if (w.getField().getName().equalsIgnoreCase(WriterPrel.PARTITION_COMPARATOR_FIELD)) {
- continue;
- }
- FieldReader reader = w.getValueVector().getReader();
- FieldConverter converter = getConverter(recordWriter, fieldId++, w.getField().getName(), reader);
- fieldConverters.add(converter);
+ int fieldId = 0;
+ for (VectorWrapper<?> w : batch) {
+ if (w.getField().getName().equalsIgnoreCase(WriterPrel.PARTITION_COMPARATOR_FIELD)) {
+ continue;
}
- } catch(Exception e) {
- logger.error("Failed to create FieldWriter.", e);
- throw new IOException("Failed to initialize FieldWriters.", e);
+ FieldReader reader = w.getValueVector().getReader();
+ FieldConverter converter = getConverter(recordWriter, fieldId++, w.getField().getName(), reader);
+ fieldConverters.add(converter);
}
}
@@ -114,10 +109,13 @@ public class StatisticsRecordWriterImpl {
return recordWriter.getNewNullable${minor.class}Converter(fieldId, fieldName, reader);
case REPEATED:
return recordWriter.getNewRepeated${minor.class}Converter(fieldId, fieldName, reader);
+ default:
+ throw new UnsupportedOperationException();
}
</#list>
</#list>
+ default:
+ throw new UnsupportedOperationException();
}
- throw new UnsupportedOperationException();
}
}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java
index 951b33d..25e472f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java
@@ -24,7 +24,6 @@ import org.apache.drill.common.DrillAutoCloseables;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.client.QuerySubmitter.Format;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.RootAllocatorFactory;
import org.apache.drill.exec.proto.UserBitShared.QueryData;
@@ -76,13 +75,9 @@ public class LoggingResultsListener implements UserResultsListener {
try {
if (data != null) {
count.addAndGet(header.getRowCount());
- try {
- loader.load(header.getDef(), data);
- // TODO: Clean: DRILL-2933: That load(...) no longer throws
- // SchemaChangeException, so check/clean catch clause below.
- } catch (SchemaChangeException e) {
- submissionFailed(UserException.systemError(e).build(logger));
- }
+ // TODO: Clean: DRILL-2933: That load(...) no longer throws
+ // SchemaChangeException.
+ loader.load(header.getDef(), data);
try {
switch(format) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
index 3291283..da6f07a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
@@ -30,7 +30,6 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("unordered-receiver")
public class UnorderedReceiver extends AbstractReceiver{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiver.class);
@JsonCreator
public UnorderedReceiver(@JsonProperty("sender-major-fragment") int oppositeMajorFragmentId,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index 24597ae..89239f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.physical.impl;
-import java.util.LinkedList;
import java.util.List;
import org.apache.drill.common.DeferredException;
@@ -126,7 +125,7 @@ public abstract class BaseRootExec implements RootExec {
}
@Override
- public void dumpBatches() {
+ public void dumpBatches(Throwable t) {
if (operators == null) {
return;
}
@@ -134,22 +133,18 @@ public abstract class BaseRootExec implements RootExec {
return;
}
- final int numberOfBatchesToDump = 2;
+ CloseableRecordBatch leafMost = findLeaf(operators, t);
+ if (leafMost == null) {
+ // Don't know which batch failed.
+ return;
+ }
+ int batchPosn = operators.indexOf(leafMost);
+ final int numberOfBatchesToDump = Math.min(batchPosn + 1, 2);
logger.error("Batch dump started: dumping last {} failed batches", numberOfBatchesToDump);
// As batches are stored in a 'flat' List there is a need to filter out the failed batch
// and a few of its parent (actual number of batches is set by a constant defined above)
- List<CloseableRecordBatch> failedBatchStack = new LinkedList<>();
- for (int i = operators.size() - 1; i >= 0; i--) {
- CloseableRecordBatch batch = operators.get(i);
- if (batch.hasFailed()) {
- failedBatchStack.add(0, batch);
- if (failedBatchStack.size() == numberOfBatchesToDump) {
- break;
- }
- }
- }
- for (CloseableRecordBatch batch : failedBatchStack) {
- batch.dump();
+ for (int i = 0; i < numberOfBatchesToDump; i++) {
+ operators.get(batchPosn--).dump();
}
logger.error("Batch dump completed.");
}
@@ -184,4 +179,38 @@ public abstract class BaseRootExec implements RootExec {
}
}
}
+
+ /**
+ * Given a list of operators and a stack trace, walks the stack trace and
+ * the operator list to find the leaf-most operator, which is the one
+ * that was active when the exception was thrown. Handle the cases in
+ * which no operator was active, each operator had multiple methods on
+ * the stack, or the exception was thrown in some class called by
+ * the operator.
+ * <p>
+ * Not all operators leave a mark in the trace. In particular if a the
+ * call stack is only through base-class methods, then we have no way to
+ * know the actual class during the call. This is OK because the leaf
+ * methods are just pass-through operations, they are unlikely to fail.
+ *
+ * @param <T> the type of the operator. Parameterized to allow easier
+ * testing
+ * @param dag the list of operators from root-most to leaf-most
+ * @param e the exception thrown somewhere in the operator tree
+ * @return the leaf-most operator, if any
+ */
+ public static <T> T findLeaf(List<T> dag, Throwable e) {
+ StackTraceElement[] trace = e.getStackTrace();
+ for (int i = dag.size() - 1; i >= 0; i--) {
+ T leaf = dag.get(i);
+ String opName = leaf.getClass().getName();
+ for (StackTraceElement element : trace) {
+ String frameName = element.getClassName();
+ if (frameName.contentEquals(opName)) {
+ return leaf;
+ }
+ }
+ }
+ return null;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 87f22be..a1f25c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -107,8 +107,9 @@ public class ImplCreator {
return null;
}
- /** Create RootExec and its children (RecordBatches) for given FragmentRoot */
-
+ /**
+ * Create RootExec and its children (RecordBatches) for given FragmentRoot
+ */
@SuppressWarnings("unchecked")
private RootExec getRootExec(final FragmentRoot root, final ExecutorFragmentContext context) throws ExecutionSetupException {
final List<RecordBatch> childRecordBatches = getChildren(root, context);
@@ -132,8 +133,9 @@ public class ImplCreator {
}
}
-
- /** Create a RecordBatch and its children for given PhysicalOperator */
+ /**
+ * Create a RecordBatch and its children for given PhysicalOperator
+ */
@VisibleForTesting
public RecordBatch getRecordBatch(final PhysicalOperator op, final ExecutorFragmentContext context) throws ExecutionSetupException {
Preconditions.checkNotNull(op);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
index 34f2131..95607a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
@@ -20,34 +20,41 @@ package org.apache.drill.exec.physical.impl;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
/**
- * <h2>Functionality</h2>
- * <p>
- * A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange
- * output nodes and storage nodes. They are there driving force behind the completion of a query.
- * </p>
- * <h2>Assumptions</h2>
- * <p>
- * All implementations of {@link RootExec} assume that all their methods are called by the same thread.
+ * Node which is the last processing node in a query plan. FragmentTerminals
+ * include Exchange output nodes and storage nodes. They are there driving force
+ * behind the completion of a query.
* </p>
+ * Assumes that all implementations of {@link RootExec} assume that all their
+ * methods are called by the same thread.
*/
public interface RootExec extends AutoCloseable {
+
/**
* Do the next batch of work.
- * @return Whether or not additional batches of work are necessary. False means that this fragment is done.
+ *
+ * @return Whether or not additional batches of work are necessary. False
+ * means that this fragment is done.
*/
boolean next();
/**
- * Inform sender that receiving fragment is finished and doesn't need any more data. This can be called multiple
- * times (once for each downstream receiver). If all receivers are finished then a subsequent call to {@link #next()}
- * will return false.
- * @param handle The handle pointing to the downstream receiver that does not need anymore data.
+ * Inform sender that receiving fragment is finished and doesn't need any more
+ * data. This can be called multiple times (once for each downstream
+ * receiver). If all receivers are finished then a subsequent call to
+ * {@link #next()} will return false.
+ *
+ * @param handle
+ * The handle pointing to the downstream receiver that does not need
+ * anymore data.
*/
void receivingFragmentFinished(FragmentHandle handle);
/**
- * Dump failed batches' state preceded by its parent's state to logs. Invoked when there is a
- * failure during fragment execution.
+ * Dump failed batches' state preceded by its parent's state to logs. Invoked
+ * when there is a failure during fragment execution.
+ *
+ * @param t the exception thrown by an operator and which therefore
+ * records, in its stack trace, which operators were active on the stack
*/
- void dumpBatches();
+ void dumpBatches(Throwable t);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index f464b27..d314794 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
@@ -40,6 +41,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
@@ -202,7 +204,7 @@ public class ScanBatch implements CloseableRecordBatch {
* @return whether we could continue iteration
* @throws Exception
*/
- private boolean shouldContinueAfterNoRecords() throws Exception {
+ private boolean shouldContinueAfterNoRecords() {
logger.trace("scan got 0 record.");
if (isRepeatableScan) {
if (!currentReader.hasNext()) {
@@ -212,13 +214,17 @@ public class ScanBatch implements CloseableRecordBatch {
}
return true;
} else { // Regular scan
- currentReader.close();
- currentReader = null;
+ closeCurrentReader();
return true; // In regular case, we always continue the iteration, if no more reader, we will break out at the head of loop
}
}
- private IterOutcome internalNext() throws Exception {
+ private void closeCurrentReader() {
+ AutoCloseables.closeSilently(currentReader);
+ currentReader = null;
+ }
+
+ private IterOutcome internalNext() {
while (true) {
if (currentReader == null && !getNextReaderIfHas()) {
logger.trace("currentReader is null");
@@ -281,18 +287,6 @@ public class ScanBatch implements CloseableRecordBatch {
clearFieldVectorMap();
lastOutcome = IterOutcome.STOP;
throw UserException.memoryError(ex).build(logger);
- } catch (ExecutionSetupException e) {
- if (currentReader != null) {
- try {
- currentReader.close();
- } catch (final Exception e2) {
- logger.error("Close failed for reader " + currentReaderClassName, e2);
- }
- }
- lastOutcome = IterOutcome.STOP;
- throw UserException.internalError(e)
- .addContext("Setup failed for", currentReaderClassName)
- .build(logger);
} catch (UserException ex) {
lastOutcome = IterOutcome.STOP;
throw ex;
@@ -309,15 +303,11 @@ public class ScanBatch implements CloseableRecordBatch {
}
private void clearFieldVectorMap() {
- for (final ValueVector v : mutator.fieldVectorMap().values()) {
- v.clear();
- }
- for (final ValueVector v : mutator.implicitFieldVectorMap.values()) {
- v.clear();
- }
+ VectorAccessibleUtilities.clear(mutator.fieldVectorMap().values());
+ VectorAccessibleUtilities.clear(mutator.implicitFieldVectorMap.values());
}
- private boolean getNextReaderIfHas() throws ExecutionSetupException {
+ private boolean getNextReaderIfHas() {
if (!readers.hasNext()) {
return false;
}
@@ -326,8 +316,15 @@ public class ScanBatch implements CloseableRecordBatch {
readers.remove();
}
implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null;
- currentReader.setup(oContext, mutator);
currentReaderClassName = currentReader.getClass().getSimpleName();
+ try {
+ currentReader.setup(oContext, mutator);
+ } catch (ExecutionSetupException e) {
+ closeCurrentReader();
+ throw UserException.executionError(e)
+ .addContext("Failed to setup reader", currentReaderClassName)
+ .build(logger);
+ }
return true;
}
@@ -405,7 +402,6 @@ public class ScanBatch implements CloseableRecordBatch {
return fqn;
}
-
/**
* Row set mutator implementation provided to record readers created by
* this scan batch. Made visible so that tests can create this mutator
@@ -414,7 +410,6 @@ public class ScanBatch implements CloseableRecordBatch {
* in turn, the only use of the generated vector readers in the vector
* package.)
*/
-
@VisibleForTesting
public static class Mutator implements OutputMutator {
/** Flag keeping track whether top-level schema has changed since last inquiry (via #isNewSchema}).
@@ -589,9 +584,7 @@ public class ScanBatch implements CloseableRecordBatch {
public void close() throws Exception {
container.clear();
mutator.clear();
- if (currentReader != null) {
- currentReader.close();
- }
+ closeCurrentReader();
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 625bfb3..765e1de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -83,47 +83,47 @@ public class ScreenCreator implements RootCreator<Screen> {
IterOutcome outcome = next(incoming);
logger.trace("Screen Outcome {}", outcome);
switch (outcome) {
- case STOP:
- return false;
- case NONE:
- if (firstBatch) {
- // this is the only data message sent to the client and may contain the schema
- QueryWritableBatch batch;
- QueryData header = QueryData.newBuilder()
- .setQueryId(context.getHandle().getQueryId())
- .setRowCount(0)
- .setDef(RecordBatchDef.getDefaultInstance())
- .build();
- batch = new QueryWritableBatch(header);
+ case STOP:
+ return false;
+ case NONE:
+ if (firstBatch) {
+ // this is the only data message sent to the client and may contain the schema
+ QueryWritableBatch batch;
+ QueryData header = QueryData.newBuilder()
+ .setQueryId(context.getHandle().getQueryId())
+ .setRowCount(0)
+ .setDef(RecordBatchDef.getDefaultInstance())
+ .build();
+ batch = new QueryWritableBatch(header);
+
+ stats.startWait();
+ try {
+ userConnection.sendData(batch);
+ } finally {
+ stats.stopWait();
+ }
+ firstBatch = false; // we don't really need to set this. But who knows!
+ }
+ return false;
+ case OK_NEW_SCHEMA:
+ materializer = new VectorRecordMaterializer(context, oContext, incoming);
+ //$FALL-THROUGH$
+ case OK:
+ injector.injectPause(context.getExecutionControls(), "sending-data", logger);
+ final QueryWritableBatch batch = materializer.convertNext();
+ updateStats(batch);
stats.startWait();
try {
userConnection.sendData(batch);
} finally {
stats.stopWait();
}
- firstBatch = false; // we don't really need to set this. But who knows!
- }
-
- return false;
- case OK_NEW_SCHEMA:
- materializer = new VectorRecordMaterializer(context, oContext, incoming);
- //$FALL-THROUGH$
- case OK:
- injector.injectPause(context.getExecutionControls(), "sending-data", logger);
- final QueryWritableBatch batch = materializer.convertNext();
- updateStats(batch);
- stats.startWait();
- try {
- userConnection.sendData(batch);
- } finally {
- stats.stopWait();
- }
- firstBatch = false;
-
- return true;
- default:
- throw new UnsupportedOperationException();
+ firstBatch = false;
+
+ return true;
+ default:
+ throw new UnsupportedOperationException(outcome.name());
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/StatisticsWriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/StatisticsWriterRecordBatch.java
index 93aadc6..e54f76d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/StatisticsWriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/StatisticsWriterRecordBatch.java
@@ -19,6 +19,7 @@
package org.apache.drill.exec.physical.impl;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
@@ -36,18 +37,19 @@ import org.apache.drill.exec.store.StatisticsRecordWriterImpl;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.BigIntVector;
import org.apache.drill.exec.vector.VarCharVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
public class StatisticsWriterRecordBatch extends AbstractRecordBatch<Writer> {
-
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StatisticsWriterRecordBatch.class);
+ private static final Logger logger = LoggerFactory.getLogger(StatisticsWriterRecordBatch.class);
private StatisticsRecordWriterImpl statsRecordWriterImpl;
private StatisticsRecordWriter recordWriter;
- private long counter = 0;
+ private long counter;
private final RecordBatch incoming;
- private boolean processed = false;
+ private boolean processed;
private final String fragmentUniqueId;
private BatchSchema schema;
@@ -91,41 +93,46 @@ public class StatisticsWriterRecordBatch extends AbstractRecordBatch<Writer> {
}
// process the complete upstream in one next() call
IterOutcome upstream;
- try {
- do {
- upstream = next(incoming);
-
- switch(upstream) {
- case STOP:
- return upstream;
-
- case NOT_YET:
- case NONE:
- break;
-
- case OK_NEW_SCHEMA:
- setupNewSchema();
- // $FALL-THROUGH$
- case OK:
+ do {
+ upstream = next(incoming);
+
+ switch(upstream) {
+ case STOP:
+ return upstream;
+
+ case NOT_YET:
+ case NONE:
+ break;
+
+ case OK_NEW_SCHEMA:
+ setupNewSchema();
+ // $FALL-THROUGH$
+ case OK:
+ try {
counter += statsRecordWriterImpl.writeStatistics(incoming.getRecordCount());
- logger.debug("Total records written so far: {}", counter);
-
- for(final VectorWrapper<?> v : incoming) {
- v.getValueVector().clear();
- }
- break;
-
- default:
- throw new UnsupportedOperationException();
- }
- } while(upstream != IterOutcome.NONE);
- // Flush blocking writers now
+ } catch (IOException e) {
+ throw UserException.dataWriteError(e)
+ .addContext("Failure when writing statistics")
+ .build(logger);
+ }
+ logger.debug("Total records written so far: {}", counter);
+
+ for(final VectorWrapper<?> v : incoming) {
+ v.getValueVector().clear();
+ }
+ break;
+
+ default:
+ throw new UnsupportedOperationException();
+ }
+ } while(upstream != IterOutcome.NONE);
+ // Flush blocking writers now
+ try {
statsRecordWriterImpl.flushBlockingWriter();
- } catch(IOException ex) {
- logger.error("Failure during query", ex);
- kill(false);
- context.getExecutorState().fail(ex);
- return IterOutcome.STOP;
+ } catch (IOException ex) {
+ throw UserException.executionError(ex)
+ .addContext("Failure when flushing the block writer")
+ .build(logger);
}
addOutputContainerData();
@@ -154,7 +161,7 @@ public class StatisticsWriterRecordBatch extends AbstractRecordBatch<Writer> {
container.setRecordCount(1);
}
- protected void setupNewSchema() throws IOException {
+ protected void setupNewSchema() {
try {
// update the schema in RecordWriter
stats.startSetup();
@@ -175,33 +182,29 @@ public class StatisticsWriterRecordBatch extends AbstractRecordBatch<Writer> {
stats.stopSetup();
}
- statsRecordWriterImpl = new StatisticsRecordWriterImpl(incoming, recordWriter);
+ try {
+ statsRecordWriterImpl = new StatisticsRecordWriterImpl(incoming, recordWriter);
+ } catch (IOException e) {
+ throw UserException.dataWriteError(e)
+ .addContext("Failure when creating the statistics record writer")
+ .build(logger);
+ }
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
schema = container.getSchema();
}
- /** Clean up needs to be performed before closing writer. Partially written data will be removed. */
+ /**
+ * Clean up needs to be performed before closing writer. Partially written
+ * data will be removed.
+ */
private void closeWriter() {
if (recordWriter == null) {
return;
}
- try {
- //Perform any cleanup prior to closing the writer
- recordWriter.cleanup();
- } catch(IOException ex) {
- context.getExecutorState().fail(ex);
- } finally {
- try {
- if (!processed) {
- recordWriter.abort();
- }
- } catch (IOException e) {
- logger.error("Abort failed. There could be leftover output files.", e);
- } finally {
- recordWriter = null;
- }
- }
+ //Perform any cleanup prior to closing the writer
+ recordWriter.cleanup();
+ recordWriter = null;
}
@Override
@@ -209,5 +212,4 @@ public class StatisticsWriterRecordBatch extends AbstractRecordBatch<Writer> {
closeWriter();
super.close();
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
index 3ffa1db..0719f0c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
@@ -27,7 +27,6 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
-import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.MaterializedField;
@@ -37,9 +36,19 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class PriorityQueueTemplate implements PriorityQueue {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueTemplate.class);
+ private static final Logger logger = LoggerFactory.getLogger(PriorityQueueTemplate.class);
+
+ /**
+ * The estimated maximum queue size used with allocating the SV4
+ * for the queue. If the queue is larger, then a) we should probably
+ * be using a sort instead of top N, and b) the code will automatically
+ * grow the SV4 as needed up to the max supported size.
+ */
+ public static final int EST_MAX_QUEUE_SIZE = 4000;
// This holds the min heap of the record indexes. Heapify condition is based on actual record though. Only records
// meeting the heap condition have their indexes in this heap. Actual record are stored inside the hyperBatch. Since
@@ -54,8 +63,8 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
// Limit determines the number of record to output and hold in queue.
private int limit;
- private int queueSize = 0;
- private int batchCount = 0;
+ private int queueSize;
+ private int batchCount;
private boolean hasSv2;
@Override
@@ -142,11 +151,7 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
public void generate() {
Stopwatch watch = Stopwatch.createStarted();
final DrillBuf drillBuf = allocator.buffer(4 * queueSize);
- try {
- finalSv4 = new SelectionVector4(drillBuf, queueSize, 4000);
- } catch (SchemaChangeException e) {
- throw AbstractRecordBatch.schemaChangeException(e, "Priority Queue", logger);
- }
+ finalSv4 = new SelectionVector4(drillBuf, queueSize, EST_MAX_QUEUE_SIZE);
for (int i = queueSize - 1; i >= 0; i--) {
finalSv4.set(i, pop());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 33b2196..d600f12 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -17,13 +17,13 @@
*/
package org.apache.drill.exec.physical.impl.TopN;
-import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.calcite.rel.RelFieldCollation.Direction;
import org.apache.drill.common.DrillAutoCloseables;
import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
@@ -31,7 +31,6 @@ import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.compile.CodeCompiler;
import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
@@ -79,12 +78,14 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
/**
- * Operator Batch which implements the TopN functionality. It is more efficient than (sort + limit) since unlike sort
- * it doesn't have to store all the input data to sort it first and then apply limit on the sorted data. Instead
- * internally it maintains a priority queue backed by a heap with the size being same as limit value.
+ * Operator Batch which implements the TopN functionality. It is more efficient
+ * than (sort + limit) since unlike sort it doesn't have to store all the input
+ * data to sort it first and then apply limit on the sorted data. Instead
+ * internally it maintains a priority queue backed by a heap with the size being
+ * same as limit value.
*/
public class TopNBatch extends AbstractRecordBatch<TopN> {
- static final Logger logger = LoggerFactory.getLogger(TopNBatch.class);
+ private static final Logger logger = LoggerFactory.getLogger(TopNBatch.class);
private final MappingSet mainMapping = createMainMappingSet();
private final MappingSet leftMapping = createLeftMappingSet();
@@ -139,7 +140,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
}
@Override
- public void buildSchema() throws SchemaChangeException {
+ public void buildSchema() {
IterOutcome outcome = next(incoming);
switch (outcome) {
case OK:
@@ -184,157 +185,157 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
// Reset the TopN state for next iteration
resetTopNState();
- try {
- boolean incomingHasSv2 = false;
- switch (incoming.getSchema().getSelectionVectorMode()) {
- case NONE: {
- break;
- }
- case TWO_BYTE: {
- incomingHasSv2 = true;
- break;
- }
- case FOUR_BYTE: {
- throw new SchemaChangeException("TopN doesn't support incoming with SV4 mode");
- }
- default:
- throw new UnsupportedOperationException("Unsupported SV mode detected in TopN incoming batch");
+ boolean incomingHasSv2 = false;
+ switch (incoming.getSchema().getSelectionVectorMode()) {
+ case NONE: {
+ break;
+ }
+ case TWO_BYTE: {
+ incomingHasSv2 = true;
+ break;
}
+ case FOUR_BYTE: {
+ throw UserException.internalError(null)
+ .message("TopN doesn't support incoming with SV4 mode")
+ .build(logger);
+ }
+ default:
+ throw new UnsupportedOperationException("Unsupported SV mode detected in TopN incoming batch");
+ }
- outer: while (true) {
- Stopwatch watch = Stopwatch.createStarted();
- if (first) {
- lastKnownOutcome = IterOutcome.OK_NEW_SCHEMA;
- // Create the SV4 object upfront to be used for both empty and non-empty incoming batches at EMIT boundary
- sv4 = new SelectionVector4(context.getAllocator(), 0);
- first = false;
- } else {
- lastKnownOutcome = next(incoming);
- }
- if (lastKnownOutcome == OK && schema == null) {
- lastKnownOutcome = IterOutcome.OK_NEW_SCHEMA;
- container.clear();
- }
- logger.debug("Took {} us to get next", watch.elapsed(TimeUnit.MICROSECONDS));
- switch (lastKnownOutcome) {
- case NONE:
- break outer;
- case NOT_YET:
- throw new UnsupportedOperationException();
- case STOP:
- return lastKnownOutcome;
- case OK_NEW_SCHEMA:
- // only change in the case that the schema truly changes. Artificial schema changes are ignored.
- // schema change handling in case when EMIT is also seen is same as without EMIT. i.e. only if union type
- // is enabled it will be handled.
- container.clear();
- firstBatchForSchema = true;
- if (!incoming.getSchema().equals(schema)) {
- if (schema != null) {
- if (!unionTypeEnabled) {
- throw new UnsupportedOperationException(String.format("TopN currently doesn't support changing " +
- "schemas with union type disabled. Please try enabling union type: %s and re-execute the query",
- ExecConstants.ENABLE_UNION_TYPE_KEY));
- } else {
- this.schema = SchemaUtil.mergeSchemas(this.schema, incoming.getSchema());
- purgeAndResetPriorityQueue();
- this.schemaChanged = true;
- }
+ outer: while (true) {
+ Stopwatch watch = Stopwatch.createStarted();
+ if (first) {
+ lastKnownOutcome = IterOutcome.OK_NEW_SCHEMA;
+ // Create the SV4 object upfront to be used for both empty and non-empty incoming batches at EMIT boundary
+ sv4 = new SelectionVector4(context.getAllocator(), 0);
+ first = false;
+ } else {
+ lastKnownOutcome = next(incoming);
+ }
+ if (lastKnownOutcome == OK && schema == null) {
+ lastKnownOutcome = IterOutcome.OK_NEW_SCHEMA;
+ container.clear();
+ }
+ logger.debug("Took {} us to get next", watch.elapsed(TimeUnit.MICROSECONDS));
+ switch (lastKnownOutcome) {
+ case NONE:
+ break outer;
+ case NOT_YET:
+ throw new UnsupportedOperationException();
+ case STOP:
+ return lastKnownOutcome;
+ case OK_NEW_SCHEMA:
+ // only change in the case that the schema truly changes. Artificial schema changes are ignored.
+ // schema change handling in case when EMIT is also seen is same as without EMIT. i.e. only if union type
+ // is enabled it will be handled.
+ container.clear();
+ firstBatchForSchema = true;
+ if (!incoming.getSchema().equals(schema)) {
+ if (schema != null) {
+ if (!unionTypeEnabled) {
+ throw new UnsupportedOperationException(String.format("TopN currently doesn't support changing " +
+ "schemas with union type disabled. Please try enabling union type: %s and re-execute the query",
+ ExecConstants.ENABLE_UNION_TYPE_KEY));
} else {
- this.schema = incoming.getSchema();
+ schema = SchemaUtil.mergeSchemas(this.schema, incoming.getSchema());
+ purgeAndResetPriorityQueue();
+ schemaChanged = true;
}
- }
- // fall through.
- case OK:
- case EMIT:
- if (incoming.getRecordCount() == 0) {
- for (VectorWrapper<?> w : incoming) {
- w.clear();
- }
- // Release memory for incoming SV2 vector
- if (incomingHasSv2) {
- incoming.getSelectionVector2().clear();
- }
- break;
- }
- countSincePurge += incoming.getRecordCount();
- batchCount++;
- RecordBatchData batch;
- if (schemaChanged) {
- batch = new RecordBatchData(SchemaUtil.coerceContainer(incoming, this.schema, oContext), oContext.getAllocator());
} else {
- batch = new RecordBatchData(incoming, oContext.getAllocator());
+ schema = incoming.getSchema();
}
- boolean success = false;
- try {
- if (priorityQueue == null) {
- priorityQueue = createNewPriorityQueue(new ExpandableHyperContainer(batch.getContainer()), config.getLimit());
- } else if (!priorityQueue.isInitialized()) {
- // means priority queue is cleaned up after producing output for first record boundary. We should
- // initialize it for next record boundary
- priorityQueue.init(config.getLimit(), oContext.getAllocator(),
- schema.getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
- }
- priorityQueue.add(batch);
- // Based on static threshold of number of batches, perform purge operation to release the memory for
- // RecordBatches which are of no use or doesn't fall under TopN category
- if (countSincePurge > config.getLimit() && batchCount > batchPurgeThreshold) {
- purge();
- countSincePurge = 0;
- batchCount = 0;
- }
- success = true;
- } finally {
- if (!success) {
- batch.clear();
- }
+ }
+ // fall through.
+ case OK:
+ case EMIT:
+ if (incoming.getRecordCount() == 0) {
+ for (VectorWrapper<?> w : incoming) {
+ w.clear();
+ }
+ // Release memory for incoming SV2 vector
+ if (incomingHasSv2) {
+ incoming.getSelectionVector2().clear();
}
break;
- default:
- throw new UnsupportedOperationException();
}
-
- // If the last seen outcome is EMIT then break the loop. We do it here since we want to process the batch
- // with records and EMIT outcome in above case statements
- if (lastKnownOutcome == EMIT) {
- break;
+ countSincePurge += incoming.getRecordCount();
+ batchCount++;
+ RecordBatchData batch;
+ if (schemaChanged) {
+ batch = new RecordBatchData(SchemaUtil.coerceContainer(incoming, this.schema, oContext), oContext.getAllocator());
+ } else {
+ batch = new RecordBatchData(incoming, oContext.getAllocator());
+ }
+ boolean success = false;
+ try {
+ if (priorityQueue == null) {
+ priorityQueue = createNewPriorityQueue(new ExpandableHyperContainer(batch.getContainer()), config.getLimit());
+ } else if (!priorityQueue.isInitialized()) {
+ // means priority queue is cleaned up after producing output for first record boundary. We should
+ // initialize it for next record boundary
+ priorityQueue.init(config.getLimit(), oContext.getAllocator(),
+ schema.getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
+ }
+ priorityQueue.add(batch);
+ // Based on static threshold of number of batches, perform purge operation to release the memory for
+ // RecordBatches which are of no use or doesn't fall under TopN category
+ if (countSincePurge > config.getLimit() && batchCount > batchPurgeThreshold) {
+ purge();
+ countSincePurge = 0;
+ batchCount = 0;
+ }
+ success = true;
+ } catch (SchemaChangeException e) {
+ throw schemaChangeException(e, logger);
+ } finally {
+ if (!success) {
+ batch.clear();
+ }
}
+ break;
+ default:
+ throw new UnsupportedOperationException();
}
- // PriorityQueue can be null here if first batch is received with OK_NEW_SCHEMA and is empty and second next()
- // call returned NONE or EMIT.
- // PriorityQueue can be uninitialized here if only empty batch is received between 2 EMIT outcome.
- if (schema == null || (priorityQueue == null || !priorityQueue.isInitialized())) {
- // builder may be null at this point if the first incoming batch is empty
- return handleEmptyBatches(lastKnownOutcome);
+ // If the last seen outcome is EMIT then break the loop. We do it here since we want to process the batch
+ // with records and EMIT outcome in above case statements
+ if (lastKnownOutcome == EMIT) {
+ break;
}
+ }
- priorityQueue.generate();
- prepareOutputContainer(priorityQueue.getHyperBatch(), priorityQueue.getFinalSv4());
-
- // With EMIT outcome control will come here multiple times whereas without EMIT outcome control will only come
- // here once. In EMIT outcome case if there is schema change in any iteration then that will be handled by
- // lastKnownOutcome.
- return getFinalOutcome();
- } catch(SchemaChangeException | ClassTransformationException | IOException ex) {
- kill(false);
- logger.error("Failure during query", ex);
- context.getExecutorState().fail(ex);
- return IterOutcome.STOP;
+ // PriorityQueue can be null here if first batch is received with OK_NEW_SCHEMA and is empty and second next()
+ // call returned NONE or EMIT.
+ // PriorityQueue can be uninitialized here if only empty batch is received between 2 EMIT outcome.
+ if (schema == null || (priorityQueue == null || !priorityQueue.isInitialized())) {
+ // builder may be null at this point if the first incoming batch is empty
+ return handleEmptyBatches(lastKnownOutcome);
}
+
+ priorityQueue.generate();
+ prepareOutputContainer(priorityQueue.getHyperBatch(), priorityQueue.getFinalSv4());
+
+ // With EMIT outcome control will come here multiple times whereas without EMIT outcome control will only come
+ // here once. In EMIT outcome case if there is schema change in any iteration then that will be handled by
+ // lastKnownOutcome.
+ return getFinalOutcome();
}
/**
- * When PriorityQueue is built up then it stores the list of limit number of record indexes (in heapSv4) which falls
- * under TopN category. But it also stores all the incoming RecordBatches with all records inside a HyperContainer
- * (hyperBatch). When a certain threshold of batches are reached then this method is called which copies the limit
- * number of records whose indexes are stored in heapSv4 out of HyperBatch to a new VectorContainer and releases
- * all other records and their batches. Later this new VectorContainer is stored inside the HyperBatch and it's
- * corresponding indexes are stored in the heapSv4 vector. This is done to avoid holding up lot's of Record Batches
- * which can create OutOfMemory condition.
- * @throws SchemaChangeException
+ * When PriorityQueue is built up then it stores the list of limit number of
+ * record indexes (in heapSv4) which falls under TopN category. But it also
+ * stores all the incoming RecordBatches with all records inside a
+ * HyperContainer (hyperBatch). When a certain threshold of batches are
+ * reached then this method is called which copies the limit number of records
+ * whose indexes are stored in heapSv4 out of HyperBatch to a new
+ * VectorContainer and releases all other records and their batches. Later
+ * this new VectorContainer is stored inside the HyperBatch and it's
+ * corresponding indexes are stored in the heapSv4 vector. This is done to
+ * avoid holding up lot's of Record Batches which can create OutOfMemory
+ * condition.
*/
- private void purge() throws SchemaChangeException {
+ private void purge() {
Stopwatch watch = Stopwatch.createStarted();
VectorContainer c = priorityQueue.getHyperBatch();
@@ -362,7 +363,11 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
// HyperContainer backing the priority queue out of it
VectorContainer newQueue = new VectorContainer();
builder.build(newQueue);
- priorityQueue.resetQueue(newQueue, builder.getSv4().createNewWrapperCurrent());
+ try {
+ priorityQueue.resetQueue(newQueue, builder.getSv4().createNewWrapperCurrent());
+ } catch (SchemaChangeException e) {
+ throw schemaChangeException(e, logger);
+ }
builder.getSv4().clear();
} finally {
DrillAutoCloseables.closeNoChecked(builder);
@@ -370,8 +375,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
logger.debug("Took {} us to purge", watch.elapsed(TimeUnit.MICROSECONDS));
}
- private PriorityQueue createNewPriorityQueue(VectorAccessible batch, int limit)
- throws SchemaChangeException, ClassTransformationException, IOException {
+ private PriorityQueue createNewPriorityQueue(VectorAccessible batch, int limit) {
return createNewPriorityQueue(
mainMapping, leftMapping, rightMapping, config.getOrderings(), batch, unionTypeEnabled,
codegenDump, limit, oContext.getAllocator(), schema.getSelectionVectorMode(), context);
@@ -392,8 +396,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
public static PriorityQueue createNewPriorityQueue(
MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping,
List<Ordering> orderings, VectorAccessible batch, boolean unionTypeEnabled, boolean codegenDump,
- int limit, BufferAllocator allocator, SelectionVectorMode mode, FragmentContext context)
- throws ClassTransformationException, IOException, SchemaChangeException {
+ int limit, BufferAllocator allocator, SelectionVectorMode mode, FragmentContext context) {
OptionSet optionSet = context.getOptions();
FunctionLookupContext functionLookupContext = context.getFunctionRegistry();
CodeGenerator<PriorityQueue> cg = CodeGenerator.get(PriorityQueue.TEMPLATE_DEFINITION, optionSet);
@@ -407,11 +410,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
for (Ordering od : orderings) {
// first, we rewrite the evaluation stack for each side of the comparison.
ErrorCollector collector = new ErrorCollectorImpl();
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(),
- batch, collector, functionLookupContext, unionTypeEnabled);
- if (collector.hasErrors()) {
- throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
- }
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, functionLookupContext, unionTypeEnabled);
+ collector.reportErrors(logger);
g.setMappingSet(leftMapping);
HoldingContainer left = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
g.setMappingSet(rightMapping);
@@ -436,7 +436,11 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
g.getEvalBlock()._return(JExpr.lit(0));
PriorityQueue q = context.getImplementationClass(cg);
- q.init(limit, allocator, mode == BatchSchema.SelectionVectorMode.TWO_BYTE);
+ try {
+ q.init(limit, allocator, mode == BatchSchema.SelectionVectorMode.TWO_BYTE);
+ } catch (SchemaChangeException e) {
+ throw TopNBatch.schemaChangeException(e, "Top N", logger);
+ }
return q;
}
@@ -445,9 +449,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
* 1. Purge existing batches
* 2. Promote newly created container for new schema.
* 3. Recreate priority queue and reset with coerced container.
- * @throws SchemaChangeException
*/
- public void purgeAndResetPriorityQueue() throws SchemaChangeException, ClassTransformationException, IOException {
+ public void purgeAndResetPriorityQueue() {
final Stopwatch watch = Stopwatch.createStarted();
final VectorContainer c = priorityQueue.getHyperBatch();
final VectorContainer newContainer = new VectorContainer(oContext);
@@ -465,7 +468,11 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
newSchemaContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);
priorityQueue.cleanup();
priorityQueue = createNewPriorityQueue(newSchemaContainer, config.getLimit());
- priorityQueue.resetQueue(newSchemaContainer, builder.getSv4().createNewWrapperCurrent());
+ try {
+ priorityQueue.resetQueue(newSchemaContainer, builder.getSv4().createNewWrapperCurrent());
+ } catch (SchemaChangeException e) {
+ throw schemaChangeException(e, logger);
+ }
} finally {
builder.clear();
builder.close();
@@ -484,7 +491,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
}
/**
- * Resets TopNBatch state to process next incoming batches independent of already seen incoming batches.
+ * Resets TopNBatch state to process next incoming batches independent of
+ * already seen incoming batches.
*/
private void resetTopNState() {
lastKnownOutcome = OK;
@@ -548,8 +556,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
* @param batchBuilder - Builder to build hyper vectors batches
* @throws SchemaChangeException
*/
- private void copyToPurge(VectorContainer newContainer, SortRecordBatchBuilder batchBuilder)
- throws SchemaChangeException {
+ private void copyToPurge(VectorContainer newContainer, SortRecordBatchBuilder batchBuilder) {
final VectorContainer c = priorityQueue.getHyperBatch();
final SelectionVector4 queueSv4 = priorityQueue.getSv4();
final SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 60a6cf6..1b368d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl;
import java.io.IOException;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
@@ -37,25 +38,29 @@ import org.apache.drill.exec.store.RecordWriter;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.BigIntVector;
import org.apache.drill.exec.vector.VarCharVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-/* Write the RecordBatch to the given RecordWriter. */
+/** Write the RecordBatch to the given RecordWriter. */
public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WriterRecordBatch.class);
+ private static final Logger logger = LoggerFactory.getLogger(WriterRecordBatch.class);
private EventBasedRecordWriter eventBasedRecordWriter;
private RecordWriter recordWriter;
- private long counter = 0;
+ private long counter;
private final RecordBatch incoming;
- private boolean processed = false;
+ private boolean processed;
private final String fragmentUniqueId;
private BatchSchema schema;
- public WriterRecordBatch(Writer writer, RecordBatch incoming, FragmentContext context, RecordWriter recordWriter) throws OutOfMemoryException {
+ public WriterRecordBatch(Writer writer, RecordBatch incoming, FragmentContext context,
+ RecordWriter recordWriter) throws OutOfMemoryException {
super(writer, context, false);
this.incoming = incoming;
final FragmentHandle handle = context.getHandle();
- fragmentUniqueId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
+ fragmentUniqueId = String.format(
+ "%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
this.recordWriter = recordWriter;
}
@@ -77,7 +82,6 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
@Override
public IterOutcome innerNext() {
if (processed) {
-// cleanup();
// if the upstream record batch is already processed and next() is called by
// downstream then return NONE to indicate completion
return IterOutcome.NONE;
@@ -85,46 +89,45 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
// process the complete upstream in one next() call
IterOutcome upstream;
- try {
- do {
- upstream = next(incoming);
-
- switch(upstream) {
- case STOP:
- return upstream;
-
- case NOT_YET:
+ do {
+ upstream = next(incoming);
+
+ switch(upstream) {
+ case STOP:
+ return upstream;
+
+ case NOT_YET:
+ break;
+ case NONE:
+ if (schema != null) {
+ // Schema is for the output batch schema which is setup in setupNewSchema(). Since the output
+ // schema is fixed ((Fragment(VARCHAR), Number of records written (BIGINT)) we should set it
+ // up even with 0 records for it to be reported back to the client.
break;
- case NONE:
- if (schema != null) {
- // Schema is for the output batch schema which is setup in setupNewSchema(). Since the output
- // schema is fixed ((Fragment(VARCHAR), Number of records written (BIGINT)) we should set it
- // up even with 0 records for it to be reported back to the client.
- break;
- }
-
- case OK_NEW_SCHEMA:
- setupNewSchema();
- // $FALL-THROUGH$
- case OK:
- counter += eventBasedRecordWriter.write(incoming.getRecordCount());
- logger.debug("Total records written so far: {}", counter);
+ }
- for(final VectorWrapper<?> v : incoming) {
- v.getValueVector().clear();
- }
- break;
-
- default:
- throw new UnsupportedOperationException();
- }
- } while(upstream != IterOutcome.NONE);
- } catch(IOException ex) {
- logger.error("Failure during query", ex);
- kill(false);
- context.getExecutorState().fail(ex);
- return IterOutcome.STOP;
- }
+ case OK_NEW_SCHEMA:
+ setupNewSchema();
+ // $FALL-THROUGH$
+ case OK:
+ try {
+ counter += eventBasedRecordWriter.write(incoming.getRecordCount());
+ } catch (IOException e) {
+ throw UserException.dataWriteError(e)
+ .addContext("Failure when writing the batch")
+ .build(logger);
+ }
+ logger.debug("Total records written so far: {}", counter);
+
+ for(final VectorWrapper<?> v : incoming) {
+ v.getValueVector().clear();
+ }
+ break;
+
+ default:
+ throw new UnsupportedOperationException();
+ }
+ } while(upstream != IterOutcome.NONE);
addOutputContainerData();
processed = true;
@@ -152,11 +155,17 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
container.setRecordCount(1);
}
- protected void setupNewSchema() throws IOException {
+ protected void setupNewSchema() {
try {
// update the schema in RecordWriter
stats.startSetup();
- recordWriter.updateSchema(incoming);
+ try {
+ recordWriter.updateSchema(incoming);
+ } catch (IOException e) {
+ throw UserException.dataWriteError(e)
+ .addContext("Failure updating record writer schema")
+ .build(logger);
+ }
// Create two vectors for:
// 1. Fragment unique id.
// 2. Summary: currently contains number of records written.
@@ -173,7 +182,13 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
stats.stopSetup();
}
- eventBasedRecordWriter = new EventBasedRecordWriter(incoming, recordWriter);
+ try {
+ eventBasedRecordWriter = new EventBasedRecordWriter(incoming, recordWriter);
+ } catch (IOException e) {
+ throw UserException.dataWriteError(e)
+ .addContext("Failed to create the event record writer")
+ .build(logger);
+ }
container.buildSchema(SelectionVectorMode.NONE);
schema = container.getSchema();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index b8f16d8..506b594 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -340,16 +340,12 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
}
case UPDATE_AGGREGATOR:
- context.getExecutorState().fail(UserException.unsupportedError()
+ throw UserException.unsupportedError()
.message(SchemaChangeException.schemaChanged(
"Hash aggregate does not support schema change",
incomingSchema,
incoming.getSchema()).getMessage())
- .build(logger));
- close();
- killIncoming(false);
- firstBatch = false;
- return IterOutcome.STOP;
+ .build(logger);
default:
throw new IllegalStateException(String.format("Unknown state %s.", out));
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordBatch.java
index 33cad10..586d34d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordBatch.java
@@ -39,10 +39,9 @@ import java.io.InputStream;
import java.util.Iterator;
/**
- * A class to replace "incoming" - instead scanning a spilled partition file
+ * Replaces "incoming" - instead scanning a spilled partition file
*/
public class SpilledRecordBatch implements CloseableRecordBatch {
-
private static final Logger logger = LoggerFactory.getLogger(SpilledRecordBatch.class);
private VectorContainer container;
@@ -137,13 +136,13 @@ public class SpilledRecordBatch implements CloseableRecordBatch {
context.getExecutorState().checkContinue();
- if ( spilledBatches <= 0 ) { // no more batches to read in this partition
+ if (spilledBatches <= 0) { // no more batches to read in this partition
this.close();
lastOutcome = IterOutcome.NONE;
return lastOutcome;
}
- if ( spillStream == null ) {
+ if (spillStream == null) {
lastOutcome = IterOutcome.STOP;
throw new IllegalStateException("Spill stream was null");
}
@@ -153,7 +152,7 @@ public class SpilledRecordBatch implements CloseableRecordBatch {
}
try {
- if ( container.getNumberOfColumns() > 0 ) { // container already initialized
+ if (container.getNumberOfColumns() > 0) { // container already initialized
// Pass our container to the reader because other classes (e.g. HashAggBatch, HashTable)
// may have a reference to this container (as an "incoming")
vas.readFromStreamWithContainer(container, spillStream);
@@ -163,11 +162,12 @@ public class SpilledRecordBatch implements CloseableRecordBatch {
container = vas.get();
}
} catch (IOException e) {
- lastOutcome = IterOutcome.STOP;
- throw UserException.dataReadError(e).addContext("Failed reading from a spill file").build(logger);
+ throw UserException.dataReadError(e)
+ .addContext("Failed reading from a spill file")
+ .build(logger);
} catch (Exception e) {
- lastOutcome = IterOutcome.STOP;
- throw e;
+ // TODO: Catch the error closer to the cause and create a better error message.
+ throw UserException.executionError(e).build(logger);
}
spilledBatches--; // one less batch to read
@@ -206,7 +206,7 @@ public class SpilledRecordBatch implements CloseableRecordBatch {
spillSet.delete(spillFile);
}
catch (IOException e) {
- /* ignore */
+ // ignore
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 11261cf..476b316 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -21,7 +21,6 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
import java.util.List;
@@ -357,13 +356,10 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
lastKnownOutcome = EMIT;
return OK_NEW_SCHEMA;
} else {
- context.getExecutorState().fail(UserException.unsupportedError().message(SchemaChangeException
- .schemaChanged("Streaming aggregate does not support schema changes", incomingSchema,
- incoming.getSchema()).getMessage()).build(logger));
- close();
- killIncoming(false);
- lastKnownOutcome = STOP;
- return IterOutcome.STOP;
+ throw UserException.schemaChangeError(SchemaChangeException.schemaChanged(
+ "Streaming aggregate does not support schema changes", incomingSchema,
+ incoming.getSchema()))
+ .build(logger);
}
default:
throw new IllegalStateException(String.format("Unknown state %s.", aggOutcome));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index b03e25f..295cfd5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -132,8 +132,9 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter> {
if (container.isSchemaChanged()) {
container.buildSchema(SelectionVectorMode.TWO_BYTE);
return true;
+ } else {
+ return false;
}
- return false;
}
protected Filterer generateSV4Filterer() throws SchemaChangeException {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterBatchCreator.java
index 65605f2..89449b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterBatchCreator.java
@@ -27,9 +27,11 @@ import org.apache.drill.exec.record.RecordBatch;
import java.util.List;
-public class RuntimeFilterBatchCreator implements BatchCreator<RuntimeFilterPOP>{
+public class RuntimeFilterBatchCreator implements BatchCreator<RuntimeFilterPOP> {
@Override
- public CloseableRecordBatch getBatch(ExecutorFragmentContext context, RuntimeFilterPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+ public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
+ RuntimeFilterPOP config, List<RecordBatch> children)
+ throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new RuntimeFilterRecordBatch(config, children.iterator().next(), context);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index 28de51f..8b61d19 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
@@ -75,7 +75,8 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
private final long maxWaitingTime;
private final long rfIdentifier;
- public RuntimeFilterRecordBatch(RuntimeFilterPOP pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
+ public RuntimeFilterRecordBatch(RuntimeFilterPOP pop, RecordBatch incoming,
+ FragmentContext context) throws OutOfMemoryException {
super(pop, context, incoming);
enableRFWaiting = context.getOptions().getBoolean(ExecConstants.HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY);
maxWaitingTime = context.getOptions().getLong(ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY);
@@ -106,11 +107,7 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
protected IterOutcome doWork() {
originalRecordCount = incoming.getRecordCount();
sv2.setBatchActualRecordCount(originalRecordCount);
- try {
- applyRuntimeFilter();
- } catch (SchemaChangeException e) {
- throw new UnsupportedOperationException(e);
- }
+ applyRuntimeFilter();
container.transferIn(incoming.getContainer());
container.setRecordCount(originalRecordCount);
updateStats();
@@ -129,7 +126,7 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
}
@Override
- protected boolean setupNewSchema() throws SchemaChangeException {
+ protected boolean setupNewSchema() {
if (sv2 != null) {
sv2.clear();
}
@@ -168,8 +165,9 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
}
/**
- * Takes care of setting up HashHelper if RuntimeFilter is received and the HashHelper is not already setup. For each
- * schema change hash64 should be reset and this method needs to be called again.
+ * Takes care of setting up HashHelper if RuntimeFilter is received and the
+ * HashHelper is not already setup. For each schema change hash64 should be
+ * reset and this method needs to be called again.
*/
private void setupHashHelper() {
current = context.getRuntimeFilter(rfIdentifier);
@@ -196,7 +194,9 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
ValueVectorReadExpression toHashFieldExp = new ValueVectorReadExpression(typedFieldId);
hashFieldExps.add(toHashFieldExp);
}
- hash64 = hashHelper.getHash64(hashFieldExps.toArray(new LogicalExpression[hashFieldExps.size()]), typedFieldIds.toArray(new TypedFieldId[typedFieldIds.size()]));
+ hash64 = hashHelper.getHash64(hashFieldExps.toArray(
+ new LogicalExpression[hashFieldExps.size()]),
+ typedFieldIds.toArray(new TypedFieldId[typedFieldIds.size()]));
} catch (Exception e) {
throw UserException.internalError(e).build(logger);
}
@@ -204,12 +204,12 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
}
/**
- * If RuntimeFilter is available then applies the filter condition on the incoming batch records and creates an SV2
- * to store indexes which passes the filter condition. In case when RuntimeFilter is not available it just pass
+ * If RuntimeFilter is available then applies the filter condition on the
+ * incoming batch records and creates an SV2 to store indexes which passes the
+ * filter condition. In case when RuntimeFilter is not available it just pass
* through all the records from incoming batch to downstream.
- * @throws SchemaChangeException
*/
- private void applyRuntimeFilter() throws SchemaChangeException {
+ private void applyRuntimeFilter() {
if (originalRecordCount <= 0) {
sv2.setRecordCount(0);
return;
@@ -238,7 +238,12 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
String fieldName = toFilterFields.get(0);
int fieldId = field2id.get(fieldName);
for (int rowIndex = 0; rowIndex < originalRecordCount; rowIndex++) {
- long hash = hash64.hash64Code(rowIndex, 0, fieldId);
+ long hash;
+ try {
+ hash = hash64.hash64Code(rowIndex, 0, fieldId);
+ } catch (SchemaChangeException e) {
+ throw new UnsupportedOperationException(e);
+ }
boolean contain = bloomFilter.find(hash);
if (contain) {
sv2.setIndex(svIndex, rowIndex);
@@ -251,7 +256,11 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
for (int i = 0; i < toFilterFields.size(); i++) {
BloomFilter bloomFilter = bloomFilters.get(i);
String fieldName = toFilterFields.get(i);
- computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
+ try {
+ computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
+ } catch (SchemaChangeException e) {
+ throw new UnsupportedOperationException(e);
+ }
}
for (int i = 0; i < originalRecordCount; i++) {
boolean contain = bitSet.get(i);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 5edea2c..eeedc72 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -96,7 +96,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This class implements the runtime execution for the Hash-Join operator
+ * Implements the runtime execution for the Hash-Join operator
* supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
* <p>
* This implementation splits the incoming Build side rows into multiple
@@ -125,7 +125,6 @@ import org.slf4j.LoggerFactory;
* greater) is a waste, indicating that the number of partitions chosen was too
* small.
*/
-
public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implements RowKeyJoin {
private static final Logger logger = LoggerFactory.getLogger(HashJoinBatch.class);
@@ -687,12 +686,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
* @param isLeft is it the left or right
*/
private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream, boolean isLeft) {
- batch.kill(true);
- while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == IterOutcome.OK) {
- VectorAccessibleUtilities.clear(batch);
- upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT, batch);
- }
+ batch.kill(true);
+ while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == IterOutcome.OK) {
+ VectorAccessibleUtilities.clear(batch);
+ upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT, batch);
+ }
}
+
private void killAndDrainLeftUpstream() { killAndDrainUpstream(probeBatch, leftUpstream, true); }
private void killAndDrainRightUpstream() { killAndDrainUpstream(buildBatch, rightUpstream, false); }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 5c9525c..6eeacd8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -27,6 +27,7 @@ import com.sun.codemodel.JExpr;
import com.sun.codemodel.JMod;
import com.sun.codemodel.JVar;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
@@ -200,8 +201,10 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
case FAILURE:
status.left.clearInflightBatches();
status.right.clearInflightBatches();
- kill(false);
- return IterOutcome.STOP;
+ // Should handle at the source of the error to provide a better error message.
+ throw UserException.executionError(null)
+ .message("Merge failed")
+ .build(logger);
case WAITING:
return IterOutcome.NOT_YET;
default:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
index e580dfa..bc8c53b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
@@ -22,7 +22,6 @@ import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.RowKeyJoinPOP;
import org.apache.drill.exec.record.AbstractRecordBatch;
@@ -36,10 +35,11 @@ import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RowKeyJoinBatch extends AbstractRecordBatch<RowKeyJoinPOP> implements RowKeyJoin {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RowKeyJoinBatch.class);
+ private static final Logger logger = LoggerFactory.getLogger(RowKeyJoinBatch.class);
// primary table side record batch
private final RecordBatch left;
@@ -51,7 +51,7 @@ public class RowKeyJoinBatch extends AbstractRecordBatch<RowKeyJoinPOP> implemen
private IterOutcome leftUpstream = IterOutcome.NONE;
private IterOutcome rightUpstream = IterOutcome.NONE;
private final List<TransferPair> transfers = Lists.newArrayList();
- private int recordCount = 0;
+ private int recordCount;
private final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
private RowKeyJoinState rkJoinState = RowKeyJoinState.INITIAL;
@@ -82,7 +82,7 @@ public class RowKeyJoinBatch extends AbstractRecordBatch<RowKeyJoinPOP> implemen
}
@Override
- protected void buildSchema() throws SchemaChangeException {
+ protected void buildSchema() {
container.clear();
rightUpstream = next(right);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 0bbaba1..bb5b38d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -21,7 +21,6 @@ import java.util.List;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Limit;
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
@@ -114,7 +113,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
}
@Override
- protected boolean setupNewSchema() throws SchemaChangeException {
+ protected boolean setupNewSchema() {
container.clear();
transfers.clear();
@@ -139,9 +138,9 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
if (container.isSchemaChanged()) {
container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE);
return true;
+ } else {
+ return false;
}
-
- return false;
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
index 48264c6..a466e43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
@@ -22,7 +22,6 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import java.util.List;
import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.PartitionLimit;
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
@@ -84,7 +83,7 @@ public class PartitionLimitRecordBatch extends AbstractSingleRecordBatch<Partiti
}
@Override
- protected boolean setupNewSchema() throws SchemaChangeException {
+ protected boolean setupNewSchema() {
container.clear();
transfers.clear();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 8845e7b..ecac4ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.PriorityQueue;
import org.apache.calcite.rel.RelFieldCollation.Direction;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
@@ -82,7 +82,7 @@ import com.sun.codemodel.JExpr;
import io.netty.buffer.ByteBuf;
/**
- * The MergingRecordBatch merges pre-sorted record batches from remote senders.
+ * Merges pre-sorted record batches from remote senders.
*/
public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> implements RecordBatch {
private static final Logger logger = LoggerFactory.getLogger(MergingRecordBatch.class);
@@ -134,19 +134,27 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
context.getBuffers().getCollector(config.getOppositeMajorFragmentId()).setAllocator(oContext.getAllocator());
}
- private RawFragmentBatch getNext(final int providerIndex) throws IOException {
+ private RawFragmentBatch getNext(final int providerIndex) {
stats.startWait();
final RawFragmentBatchProvider provider = fragProviders[providerIndex];
try {
injector.injectInterruptiblePause(context.getExecutionControls(), "waiting-for-data", logger);
- final RawFragmentBatch b = provider.getNext();
+ RawFragmentBatch b;
+ try {
+ b = provider.getNext();
+ } catch (IOException e) {
+ // TODO: Better to handle inside getNext() to provide a better error message
+ throw UserException.dataReadError(e)
+ .addContext("Failed to read incoming merge batch")
+ .build(logger);
+ }
if (b != null) {
stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount());
stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
inputCounts[providerIndex] += b.getHeader().getDef().getRecordCount();
}
return b;
- } catch(final InterruptedException e) {
+ } catch (final InterruptedException e) {
// Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
// interruption and respond to it if it wants to.
Thread.currentThread().interrupt();
@@ -203,12 +211,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
rawBatch = tempBatchHolder[p];
tempBatchHolder[p] = null;
} else {
- try {
- rawBatch = getNext(p);
- } catch (final IOException e) {
- context.getExecutorState().fail(e);
- return IterOutcome.STOP;
- }
+ rawBatch = getNext(p);
}
checkContinue();
@@ -231,18 +234,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
rawBatches.add(rawBatch);
} else {
// keep reading till we get a batch with record count > 0 or we have no more batches to read i.e. we get null
- try {
- while ((rawBatch = getNext(p)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
- // Do nothing
- }
- if (rawBatch == null) {
- checkContinue();
- createDummyBatch = true;
- }
- } catch (final IOException e) {
- context.getExecutorState().fail(e);
- clearBatches(rawBatches);
- return IterOutcome.STOP;
+ while ((rawBatch = getNext(p)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
+ // Do nothing
+ }
+ if (rawBatch == null) {
+ checkContinue();
+ createDummyBatch = true;
}
if (rawBatch == null || rawBatch.getHeader().getDef().getFieldCount() == 0) {
createDummyBatch = true;
@@ -307,15 +304,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
for (final RawFragmentBatch batch : incomingBatches) {
// initialize the incoming batchLoaders
final UserBitShared.RecordBatchDef rbd = batch.getHeader().getDef();
- try {
- batchLoaders[i].load(rbd, batch.getBody());
- // TODO: Clean: DRILL-2933: That load(...) no longer throws
- // SchemaChangeException, so check/clean catch clause below.
- } catch(final SchemaChangeException e) {
- logger.error("MergingReceiver failed to load record batch from remote host. {}", e);
- context.getExecutorState().fail(e);
- return IterOutcome.STOP;
- }
+ // TODO: Clean: DRILL-2933: That load(...) no longer throws
+ batchLoaders[i].load(rbd, batch.getBody());
batch.release();
++batchOffsets[i];
++i;
@@ -325,10 +315,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// Ensure all the incoming batches have the identical schema.
// Note: RecordBatchLoader permutes the columns to obtain the same columns order for all batches.
- if (!isSameSchemaAmongBatches(batchLoaders)) {
- context.getExecutorState().fail(new SchemaChangeException("Incoming batches for merging receiver have different schemas!"));
- return IterOutcome.STOP;
- }
+ checkSameSchemaAmongBatches(batchLoaders);
// create the outgoing schema and vector container, and allocate the initial batch
final SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
@@ -364,19 +351,14 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// populate the priority queue with initial values
for (int b = 0; b < senderCount; ++b) {
while (batchLoaders[b] != null && batchLoaders[b].getRecordCount() == 0) {
- try {
- final RawFragmentBatch batch = getNext(b);
- incomingBatches[b] = batch;
- if (batch != null) {
- batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody());
- } else {
- batchLoaders[b].clear();
- batchLoaders[b] = null;
- checkContinue();
- }
- } catch (IOException | SchemaChangeException e) {
- context.getExecutorState().fail(e);
- return IterOutcome.STOP;
+ final RawFragmentBatch batch = getNext(b);
+ incomingBatches[b] = batch;
+ if (batch != null) {
+ batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody());
+ } else {
+ batchLoaders[b].clear();
+ batchLoaders[b] = null;
+ checkContinue();
}
}
if (batchLoaders[b] != null) {
@@ -399,21 +381,16 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
if (node.valueIndex == batchLoaders[node.batchId].getRecordCount() - 1) {
// reached the end of an incoming record batch
RawFragmentBatch nextBatch;
- try {
- nextBatch = getNext(node.batchId);
+ nextBatch = getNext(node.batchId);
- while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
- nextBatch = getNext(node.batchId);
- }
+ while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
+ nextBatch = getNext(node.batchId);
+ }
- assert nextBatch != null || inputCounts[node.batchId] == outputCounts[node.batchId]
- : String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
- if (nextBatch == null) {
- checkContinue();
- }
- } catch (final IOException e) {
- context.getExecutorState().fail(e);
- return IterOutcome.STOP;
+ assert nextBatch != null || inputCounts[node.batchId] == outputCounts[node.batchId]
+ : String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
+ if (nextBatch == null) {
+ checkContinue();
}
incomingBatches[node.batchId] = nextBatch;
@@ -441,14 +418,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
final UserBitShared.RecordBatchDef rbd = incomingBatches[node.batchId].getHeader().getDef();
- try {
- batchLoaders[node.batchId].load(rbd, incomingBatches[node.batchId].getBody());
- // TODO: Clean: DRILL-2933: That load(...) no longer throws
- // SchemaChangeException, so check/clean catch clause below.
- } catch(final SchemaChangeException ex) {
- context.getExecutorState().fail(ex);
- return IterOutcome.STOP;
- }
+ // TODO: Clean: DRILL-2933: That load(...) no longer throws
+ batchLoaders[node.batchId].load(rbd, incomingBatches[node.batchId].getBody());
incomingBatches[node.batchId].release();
batchOffsets[node.batchId] = 0;
@@ -457,12 +428,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
node.valueIndex = 0;
pqueue.add(node);
}
-
} else {
node.valueIndex++;
pqueue.add(node);
}
-
}
// set the value counts in the outgoing vectors
@@ -527,33 +496,30 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
@Override
- public void buildSchema() throws SchemaChangeException {
+ public void buildSchema() {
// find frag provider that has data to use to build schema, and put in tempBatchHolder for later use
tempBatchHolder = new RawFragmentBatch[fragProviders.length];
int i = 0;
- try {
- while (true) {
- if (i >= fragProviders.length) {
- state = BatchState.DONE;
- return;
- }
- final RawFragmentBatch batch = getNext(i);
- if (batch == null) {
- checkContinue();
- }
- if (batch.getHeader().getDef().getFieldCount() == 0) {
- i++;
- continue;
- }
- tempBatchHolder[i] = batch;
- for (final SerializedField field : batch.getHeader().getDef().getFieldList()) {
- final ValueVector v = container.addOrGet(MaterializedField.create(field));
- v.allocateNew();
- }
+ while (true) {
+ if (i >= fragProviders.length) {
+ state = BatchState.DONE;
+ return;
+ }
+ final RawFragmentBatch batch = getNext(i);
+ if (batch == null) {
+ checkContinue();
break;
}
- } catch (final IOException e) {
- throw new DrillRuntimeException(e);
+ if (batch.getHeader().getDef().getFieldCount() == 0) {
+ i++;
+ continue;
+ }
+ tempBatchHolder[i] = batch;
+ for (final SerializedField field : batch.getHeader().getDef().getFieldList()) {
+ final ValueVector v = container.addOrGet(MaterializedField.create(field));
+ v.allocateNew();
+ }
+ break;
}
container.buildSchema(SelectionVectorMode.NONE);
container.setEmpty();
@@ -625,18 +591,22 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
//No op
}
- private boolean isSameSchemaAmongBatches(final RecordBatchLoader[] batchLoaders) {
+ private void checkSameSchemaAmongBatches(final RecordBatchLoader[] batchLoaders) {
Preconditions.checkArgument(batchLoaders.length > 0, "0 batch is not allowed!");
final BatchSchema schema = batchLoaders[0].getSchema();
for (int i = 1; i < batchLoaders.length; i++) {
if (!schema.equals(batchLoaders[i].getSchema())) {
- logger.error("Schemas are different. Schema 1 : " + schema + ", Schema 2: " + batchLoaders[i].getSchema() );
- return false;
+ throw UserException.schemaChangeError()
+ .message("Incoming batches for merging receiver have different schemas!")
+ .addContext("Schema 1: %s, Schema 2: %s",
+ schema.toString())
+ .addContext("Schema 2: %s",
+ batchLoaders[i].getSchema().toString())
+ .build(logger);
}
}
- return true;
}
private void allocateOutgoing() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
index 15b103d..49dc42e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
@@ -29,6 +29,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
@@ -92,10 +93,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Terminal operator for producing ANALYZE statement. This operator is responsible for converting
- * obtained metadata, fetching absent metadata from the Metastore and storing resulting metadata into the Metastore.
+ * Terminal operator for producing ANALYZE statement. This operator is
+ * responsible for converting obtained metadata, fetching absent metadata from
+ * the Metastore and storing resulting metadata into the Metastore.
* <p>
- * This operator has two inputs: left input contains metadata and right input contains statistics metadata.
+ * This operator has two inputs: left input contains metadata and right input
+ * contains statistics metadata.
*/
public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataControllerPOP> {
private static final Logger logger = LoggerFactory.getLogger(MetadataControllerBatch.class);
@@ -183,11 +186,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
}
break;
default:
- context.getExecutorState()
- .fail(new UnsupportedOperationException("Unsupported upstream state " + outcome));
- close();
- killIncoming(false);
- return IterOutcome.STOP;
+ throw new UnsupportedOperationException("Unsupported upstream state " + outcome);
}
}
@@ -217,35 +216,17 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
//fall through
case OK:
assert !firstRight : "First batch should be OK_NEW_SCHEMA";
- try {
- appendStatistics(statisticsCollector);
- } catch (IOException e) {
- context.getExecutorState().fail(e);
- close();
- killIncoming(false);
- return IterOutcome.STOP;
- }
+ appendStatistics(statisticsCollector);
break;
default:
- context.getExecutorState()
- .fail(new UnsupportedOperationException("Unsupported upstream state " + outcome));
- close();
- killIncoming(false);
- return IterOutcome.STOP;
+ throw new UnsupportedOperationException("Unsupported upstream state " + outcome);
}
}
return null;
}
private IterOutcome handleLeftIncoming() {
- try {
- metadataUnits.addAll(getMetadataUnits(left.getContainer()));
- } catch (Exception e) {
- context.getExecutorState().fail(e);
- close();
- killIncoming(false);
- return IterOutcome.STOP;
- }
+ metadataUnits.addAll(getMetadataUnits(left.getContainer()));
return IterOutcome.OK;
}
@@ -265,11 +246,9 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
MetastoreTableInfo metastoreTableInfo = popConfig.getContext().metastoreTableInfo();
if (tables.basicRequests().hasMetastoreTableInfoChanged(metastoreTableInfo)) {
- context.getExecutorState()
- .fail(new IllegalStateException(String.format("Metadata for table [%s] was changed before analyze is finished", tableInfo.name())));
- close();
- killIncoming(false);
- return IterOutcome.STOP;
+ throw UserException.executionError(null)
+ .message("Metadata for table [%s] was changed before analyze is finished", tableInfo.name())
+ .build(logger);
}
modify.overwrite(metadataUnits)
@@ -668,7 +647,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
return metadataStatistics;
}
- private void appendStatistics(StatisticsRecordCollector statisticsCollector) throws IOException {
+ private void appendStatistics(StatisticsRecordCollector statisticsCollector) {
if (context.getOptions().getOption(PlannerSettings.STATISTICS_USE)) {
List<FieldConverter> fieldConverters = new ArrayList<>();
int fieldId = 0;
@@ -683,16 +662,22 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
fieldConverters.add(converter);
}
- for (int counter = 0; counter < right.getRecordCount(); counter++) {
- statisticsCollector.startStatisticsRecord();
- // write the current record
- for (FieldConverter converter : fieldConverters) {
- converter.setPosition(counter);
- converter.startField();
- converter.writeField();
- converter.endField();
+ try {
+ for (int counter = 0; counter < right.getRecordCount(); counter++) {
+ statisticsCollector.startStatisticsRecord();
+ // write the current record
+ for (FieldConverter converter : fieldConverters) {
+ converter.setPosition(counter);
+ converter.startField();
+ converter.writeField();
+ converter.endField();
+ }
+ statisticsCollector.endStatisticsRecord();
}
- statisticsCollector.endStatisticsRecord();
+ } catch (IOException e) {
+ throw UserException.dataWriteError(e)
+ .addContext("Failed to write metadata")
+ .build(logger);
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
index c302ef2..600a170 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
@@ -72,8 +72,8 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
/**
- * Operator responsible for handling metadata returned by incoming aggregate operators and fetching
- * required metadata form the Metastore.
+ * Responsible for handling metadata returned by incoming aggregate operators
+ * and fetching required metadata form the Metastore.
*/
public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHandlerPOP> {
private static final Logger logger = LoggerFactory.getLogger(MetadataHandlerBatch.class);
@@ -128,11 +128,7 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
case STOP:
return outcome;
default:
- context.getExecutorState()
- .fail(new UnsupportedOperationException("Unsupported upstream state " + outcome));
- close();
- killIncoming(false);
- return IterOutcome.STOP;
+ throw new UnsupportedOperationException("Unsupported upstream state " + outcome);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index c32cdbf..f52554c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.FieldReference;
@@ -50,7 +51,6 @@ import org.apache.drill.exec.expr.ValueVectorReadExpression;
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.QueryCancelledException;
import org.apache.drill.exec.physical.config.OrderedPartitionSender;
import org.apache.drill.exec.physical.impl.sort.SortBatch;
import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
@@ -87,6 +87,8 @@ import com.sun.codemodel.JExpr;
* value is determined by where each record falls in the partition table. This
* column is used by PartitionSenderRootExec to determine which bucket to assign
* each record to.
+ * <p>
+ * This code is not used.
*/
public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPartitionSender> {
static final Logger logger = LoggerFactory.getLogger(OrderedPartitionRecordBatch.class);
@@ -267,7 +269,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
try {
Thread.sleep(timeout);
} catch (final InterruptedException e) {
- throw new QueryCancelledException();
+ checkContinue();
}
}
}
@@ -281,13 +283,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
* table, and attempts to push the partition table to the distributed cache.
* Whichever table gets pushed first becomes the table used by all fragments
* for partitioning.
- *
- * @return True is successful. False if failed.
*/
- private boolean getPartitionVectors() {
- if (!saveSamples()) {
- return false;
- }
+ private void getPartitionVectors() {
+ saveSamples();
CachedVectorContainer finalTable = null;
@@ -328,7 +326,6 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
for (VectorWrapper<?> w : finalTable.get()) {
partitionVectors.add(w.getValueVector());
}
- return true;
}
private void buildTable() {
@@ -398,17 +395,12 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
}
/**
- * Creates a copier that does a project for every Nth record from a VectorContainer incoming into VectorContainer
- * outgoing. Each Ordering in orderings generates a column, and evaluation of the expression associated with each
- * Ordering determines the value of each column. These records will later be sorted based on the values in each
- * column, in the same order as the orderings.
- *
- * @param sv4
- * @param incoming
- * @param outgoing
- * @param orderings
- * @return
- * @throws SchemaChangeException
+ * Creates a copier that does a project for every Nth record from a
+ * VectorContainer incoming into VectorContainer outgoing. Each Ordering in
+ * orderings generates a column, and evaluation of the expression associated
+ * with each Ordering determines the value of each column. These records will
+ * later be sorted based on the values in each column, in the same order as
+ * the orderings.
*/
private SampleCopier getCopier(SelectionVector4 sv4, VectorContainer incoming, VectorContainer outgoing,
List<Ordering> orderings, List<ValueVector> localAllocationVectors) {
@@ -546,7 +538,6 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
* in the partition table
*
* @param batch
- * @throws SchemaChangeException
*/
protected void setupNewSchema(VectorAccessible batch) {
container.clear();
@@ -606,7 +597,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
try {
projector.setup(context, batch, this, transfers, partitionVectors, partitions, popConfig.getRef());
} catch (SchemaChangeException e) {
- throw schemaChangeException(e, logger);
+ throw UserException.schemaChangeError(e)
+ .addContext("Unexpected schema change in the Ordered Partitioner")
+ .build(logger);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 18fe9bc..3b7f78a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -47,8 +47,8 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
-import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.vector.CopyUtil;
@@ -211,9 +211,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
incoming.kill(false);
return false;
}
- for (VectorWrapper<?> v : incoming) {
- v.clear();
- }
+ VectorAccessibleUtilities.clear(incoming);
return true;
case NOT_YET:
default:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
index 8646192..ec9b550 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
@@ -39,17 +39,19 @@ import org.apache.drill.exec.testing.CountDownLatchInjection;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Decorator class to hide multiple Partitioner existence from the caller
- * since this class involves multithreaded processing of incoming batches
- * as well as flushing it needs special handling of OperatorStats - stats
- * since stats are not suitable for use in multithreaded environment
- * The algorithm to figure out processing versus wait time is based on following formula:
- * totalWaitTime = totalAllPartitionersProcessingTime - max(sum(processingTime) by partitioner)
+ * Decorator class to hide multiple Partitioner existence from the caller since
+ * this class involves multithreaded processing of incoming batches as well as
+ * flushing it needs special handling of OperatorStats - stats since stats are
+ * not suitable for use in multithreaded environment The algorithm to figure out
+ * processing versus wait time is based on following formula: totalWaitTime =
+ * totalAllPartitionersProcessingTime - max(sum(processingTime) by partitioner)
*/
public final class PartitionerDecorator {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerDecorator.class);
+ private static final Logger logger = LoggerFactory.getLogger(PartitionerDecorator.class);
private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(PartitionerDecorator.class);
private final List<Partitioner> partitioners;
@@ -394,7 +396,7 @@ public final class PartitionerDecorator {
}
public ExecutionException getException() {
- return this.exception;
+ return exception;
}
public OperatorStats getStats() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index 9d67b5d..8c8cc54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -38,9 +38,11 @@ import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.ValueVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer> {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProducerConsumerBatch.class);
+ private static final Logger logger = LoggerFactory.getLogger(ProducerConsumerBatch.class);
private final RecordBatch incoming;
private final Thread producer = new Thread(new Producer(), Thread.currentThread().getName() + " - Producer Thread");
@@ -48,7 +50,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
private final BlockingDeque<RecordBatchDataWrapper> queue;
private int recordCount;
private BatchSchema schema;
- private boolean stop = false;
+ private boolean stop;
private final CountDownLatch cleanUpLatch = new CountDownLatch(1); // used to wait producer to clean up
protected ProducerConsumerBatch(final ProducerConsumer popConfig, final FragmentContext context, final RecordBatch incoming) throws OutOfMemoryException {
@@ -77,8 +79,6 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
return IterOutcome.NONE;
} else if (wrapper.failed) {
return IterOutcome.STOP;
- } else if (wrapper.outOfMemory) {
- throw new OutOfMemoryException();
}
recordCount = wrapper.batch.getRecordCount();
@@ -143,13 +143,6 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
throw new UnsupportedOperationException();
}
}
- } catch (final OutOfMemoryException e) {
- try {
- queue.putFirst(RecordBatchDataWrapper.outOfMemory());
- } catch (final InterruptedException ex) {
- logger.error("Unable to enqueue the last batch indicator. Something is broken.", ex);
- // TODO InterruptedException
- }
} catch (final InterruptedException e) {
logger.warn("Producer thread is interrupted.", e);
throw new QueryCancelledException();
@@ -183,6 +176,10 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
@Override
protected void killIncoming(final boolean sendUpstream) {
stop = true;
+ }
+
+ @Override
+ public void close() {
producer.interrupt();
try {
producer.join();
@@ -190,11 +187,6 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
logger.warn("Interrupted while waiting for producer thread");
// TODO InterruptedException
}
- }
-
- @Override
- public void close() {
- stop = true;
try {
cleanUpLatch.await();
} catch (final InterruptedException e) {
@@ -216,29 +208,23 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
final RecordBatchData batch;
final boolean finished;
final boolean failed;
- final boolean outOfMemory;
- RecordBatchDataWrapper(final RecordBatchData batch, final boolean finished, final boolean failed, final boolean outOfMemory) {
+ RecordBatchDataWrapper(final RecordBatchData batch, final boolean finished, final boolean failed) {
this.batch = batch;
this.finished = finished;
this.failed = failed;
- this.outOfMemory = outOfMemory;
}
public static RecordBatchDataWrapper batch(final RecordBatchData batch) {
- return new RecordBatchDataWrapper(batch, false, false, false);
+ return new RecordBatchDataWrapper(batch, false, false);
}
public static RecordBatchDataWrapper finished() {
- return new RecordBatchDataWrapper(null, true, false, false);
+ return new RecordBatchDataWrapper(null, true, false);
}
public static RecordBatchDataWrapper failed() {
- return new RecordBatchDataWrapper(null, false, true, false);
- }
-
- public static RecordBatchDataWrapper outOfMemory() {
- return new RecordBatchDataWrapper(null, false, false, true);
+ return new RecordBatchDataWrapper(null, false, true);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
index 102bb4c..b3ca591 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
@@ -51,7 +51,6 @@ import org.slf4j.LoggerFactory;
* batches. The <tt>TransferPair</tt> abstraction fails if different
* vectors appear across batches.
*/
-
public class OperatorRecordBatch implements CloseableRecordBatch {
static final Logger logger = LoggerFactory.getLogger(OperatorRecordBatch.class);
@@ -149,10 +148,6 @@ public class OperatorRecordBatch implements CloseableRecordBatch {
driver.operatorContext().getStats().startProcessing();
lastOutcome = driver.next();
return lastOutcome;
- } catch (Exception e) {
- // mark batch as failed
- lastOutcome = IterOutcome.STOP;
- throw e;
} finally {
driver.operatorContext().getStats().stopProcessing();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
index 7a61489..56c6246 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
@@ -107,7 +107,7 @@ public class RangePartitionRecordBatch extends AbstractSingleRecordBatch<RangePa
* @return True if the new schema differs from old schema, False otherwise
*/
@Override
- protected boolean setupNewSchema() throws SchemaChangeException {
+ protected boolean setupNewSchema() {
container.clear();
for (VectorWrapper<?> vw : incoming) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index 94770b6..240ee53 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -28,7 +28,6 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.AllocationReservation;
import org.apache.drill.exec.memory.BaseAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.MaterializedField;
@@ -158,11 +157,7 @@ public class SortRecordBatchBuilder implements AutoCloseable {
if (svBuffer == null) {
throw new OutOfMemoryError("Failed to allocate direct memory for SV4 vector in SortRecordBatchBuilder.");
}
- try {
- sv4 = new SelectionVector4(svBuffer, recordCount, ValueVector.MAX_ROW_COUNT);
- } catch (SchemaChangeException e) {
- throw AbstractRecordBatch.schemaChangeException(e, "Sort", logger);
- }
+ sv4 = new SelectionVector4(svBuffer, recordCount, ValueVector.MAX_ROW_COUNT);
BatchSchema schema = batches.keySet().iterator().next();
List<RecordBatchData> data = batches.get(schema);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
index 104917e..7a88dc9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
@@ -24,14 +24,12 @@ import java.util.List;
import java.util.Map;
import java.util.TimeZone;
-import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.PhysicalOperatorUtil;
@@ -47,8 +45,6 @@ import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.MapVector;
import org.apache.drill.metastore.statistics.Statistic;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Example input and output:
@@ -86,9 +82,7 @@ import org.slf4j.LoggerFactory;
* .... another map for next stats function ....
* </pre>
*/
-
public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMerge> {
- private static final Logger logger = LoggerFactory.getLogger(StatisticsMergeBatch.class);
private final Map<String, String> functions;
private boolean first = true;
@@ -110,8 +104,7 @@ public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMe
* Creates key columns for the outgoing batch e.g. `schema`, `computed`. These columns are NOT
* table columns for which statistics will be computed.
*/
- private void createKeyColumn(String name, LogicalExpression expr)
- throws SchemaChangeException {
+ private void createKeyColumn(String name, LogicalExpression expr) {
LogicalExpression mle = PhysicalOperatorUtil.materializeExpression(expr, incoming, context);
MaterializedField outputField = MaterializedField.create(name, mle.getMajorType());
ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
@@ -173,11 +166,12 @@ public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMe
}
}
- /* Prepare the outgoing container. Generates the outgoing record batch schema.
+ /**
+ * Prepare the outgoing container. Generates the outgoing record batch schema.
* Please look at the comments above the class definition which describes the
* incoming/outgoing batch schema
*/
- private void buildOutputContainer() throws SchemaChangeException {
+ private void buildOutputContainer() {
// Populate the list of statistics which will be output in the schema
for (VectorWrapper<?> vw : incoming) {
for (String outputStatName : functions.keySet()) {
@@ -223,12 +217,13 @@ public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMe
container.buildSchema(incoming.getSchema().getSelectionVectorMode());
}
- /* Adds a value vector corresponding to the statistic in the outgoing record batch.
- * Determines the MajorType based on the incoming value vector. Please look at the
- * comments above the class definition which describes the incoming/outgoing batch schema
+ /**
+ * Adds a value vector corresponding to the statistic in the outgoing record
+ * batch. Determines the MajorType based on the incoming value vector. Please
+ * look at the comments above the class definition which describes the
+ * incoming/outgoing batch schema
*/
- private void addVectorToOutgoingContainer(String outStatName, VectorWrapper<?> vw)
- throws SchemaChangeException {
+ private void addVectorToOutgoingContainer(String outStatName, VectorWrapper<?> vw) {
// Input map vector
MapVector inputVector = (MapVector) vw.getValueVector();
assert inputVector.getPrimitiveVectors().size() > 0;
@@ -265,7 +260,8 @@ public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMe
}
}
- /* Prepare the outgoing container. Populates the outgoing record batch data.
+ /**
+ * Prepare the outgoing container. Populates the outgoing record batch data.
* Please look at the comments above the class definition which describes the
* incoming/outgoing batch schema
*/
@@ -301,7 +297,7 @@ public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMe
}
@Override
- protected boolean setupNewSchema() throws SchemaChangeException {
+ protected boolean setupNewSchema() {
container.clear();
// Generate the list of fields for which statistics will be merged
buildColumnsList();
@@ -340,40 +336,34 @@ public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMe
if (finished) {
return IterOutcome.NONE;
}
- try {
- outer: while (true) {
- outcome = next(incoming);
- switch (outcome) {
- case NONE:
- break outer;
- case NOT_YET:
- case STOP:
- return outcome;
- case OK_NEW_SCHEMA:
- if (first) {
- first = false;
- if (!setupNewSchema()) {
- outcome = IterOutcome.OK;
- }
- return outcome;
+ outer: while (true) {
+ outcome = next(incoming);
+ switch (outcome) {
+ case NONE:
+ break outer;
+ case NOT_YET:
+ case STOP:
+ return outcome;
+ case OK_NEW_SCHEMA:
+ if (first) {
+ first = false;
+ if (!setupNewSchema()) {
+ outcome = IterOutcome.OK;
}
- //fall through
- case OK:
- assert first == false : "First batch should be OK_NEW_SCHEMA";
- IterOutcome out = doWork();
- didSomeWork = true;
- if (out != IterOutcome.OK) {
- return out;
- }
- break;
- default:
- throw new UnsupportedOperationException("Unsupported upstream state " + outcome);
- }
+ return outcome;
+ }
+ //fall through
+ case OK:
+ assert first == false : "First batch should be OK_NEW_SCHEMA";
+ IterOutcome out = doWork();
+ didSomeWork = true;
+ if (out != IterOutcome.OK) {
+ return out;
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported upstream state " + outcome);
}
- } catch (SchemaChangeException ex) {
- kill(false);
- context.getExecutorState().fail(UserException.unsupportedError(ex).build(logger));
- return IterOutcome.STOP;
}
// We can only get here if upstream is NONE i.e. no more batches. If we did some work prior to
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index a9584bb..8a18093 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.physical.impl.svremover;
import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.SelectionVectorRemover;
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
@@ -45,7 +44,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
}
@Override
- protected boolean setupNewSchema() throws SchemaChangeException {
+ protected boolean setupNewSchema() {
// Don't clear off container just because an OK_NEW_SCHEMA was received from
// upstream. For cases when there is just
// change in container type but no actual schema change, RemovingRecordBatch
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 85eceea..65d66ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -194,11 +194,6 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
stats.batchReceived(0, incoming.getRecordCount(), true);
memoryManager.update();
hasRemainder = incoming.getRecordCount() > 0;
- } catch (SchemaChangeException ex) {
- kill(false);
- logger.error("Failure during query", ex);
- context.getExecutorState().fail(ex);
- return IterOutcome.STOP;
} finally {
stats.stopSetup();
}
@@ -209,32 +204,25 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
container.zeroVectors();
// Check if schema has changed
if (lateral.getRecordIndex() == 0) {
- try {
- boolean hasNewSchema = schemaChanged();
- stats.batchReceived(0, incoming.getRecordCount(), hasNewSchema);
- if (hasNewSchema) {
- setupNewSchema();
- hasRemainder = true;
- memoryManager.update();
- return OK_NEW_SCHEMA;
- } else { // Unnest field schema didn't changed but new left empty/nonempty batch might come with OK_NEW_SCHEMA
- // This means even though there is no schema change for unnest field the reference of unnest field
- // ValueVector must have changed hence we should just refresh the transfer pairs and keep output vector
- // same as before. In case when new left batch is received with SchemaChange but was empty Lateral will
- // not call next on unnest and will change it's left outcome to OK. Whereas for non-empty batch next will
- // be called on unnest by Lateral. Hence UNNEST cannot rely on lateral current outcome to setup transfer
- // pair. It should do for each new left incoming batch.
- resetUnnestTransferPair();
- container.zeroVectors();
- } // else
- unnest.resetGroupIndex();
+ boolean hasNewSchema = schemaChanged();
+ stats.batchReceived(0, incoming.getRecordCount(), hasNewSchema);
+ if (hasNewSchema) {
+ setupNewSchema();
+ hasRemainder = true;
memoryManager.update();
- } catch (SchemaChangeException ex) {
- kill(false);
- logger.error("Failure during query", ex);
- context.getExecutorState().fail(ex);
- return IterOutcome.STOP;
- }
+ return OK_NEW_SCHEMA;
+ } else { // Unnest field schema didn't changed but new left empty/nonempty batch might come with OK_NEW_SCHEMA
+ // This means even though there is no schema change for unnest field the reference of unnest field
+ // ValueVector must have changed hence we should just refresh the transfer pairs and keep output vector
+ // same as before. In case when new left batch is received with SchemaChange but was empty Lateral will
+ // not call next on unnest and will change it's left outcome to OK. Whereas for non-empty batch next will
+ // be called on unnest by Lateral. Hence UNNEST cannot rely on lateral current outcome to setup transfer
+ // pair. It should do for each new left incoming batch.
+ resetUnnestTransferPair();
+ container.zeroVectors();
+ } // else
+ unnest.resetGroupIndex();
+ memoryManager.update();
}
return doWork();
}
@@ -350,20 +338,24 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
return tp;
}
- private TransferPair resetUnnestTransferPair() throws SchemaChangeException {
+ private TransferPair resetUnnestTransferPair() {
List<TransferPair> transfers = Lists.newArrayList();
FieldReference fieldReference = new FieldReference(popConfig.getColumn());
TransferPair transferPair = getUnnestFieldTransferPair(fieldReference);
transfers.add(transferPair);
logger.debug("Added transfer for unnest expression.");
unnest.close();
- unnest.setup(context, incoming, this, transfers);
+ try {
+ unnest.setup(context, incoming, this, transfers);
+ } catch (SchemaChangeException e) {
+ throw schemaChangeException(e, logger);
+ }
setUnnestVector();
return transferPair;
}
@Override
- protected boolean setupNewSchema() throws SchemaChangeException {
+ protected boolean setupNewSchema() {
Preconditions.checkNotNull(lateral);
container.clear();
MaterializedField rowIdField = MaterializedField.create(rowIdColumnName, Types.required(TypeProtos
@@ -380,13 +372,13 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
}
/**
- * Compares the schema of the unnest column in the current incoming with the schema of
- * the unnest column in the previous incoming.
- * Also saves the schema for comparison in future iterations
+ * Compares the schema of the unnest column in the current incoming with the
+ * schema of the unnest column in the previous incoming. Also saves the schema
+ * for comparison in future iterations
*
* @return true if the schema has changed, false otherwise
*/
- private boolean schemaChanged() throws SchemaChangeException {
+ private boolean schemaChanged() {
unnestTypedFieldId = checkAndGetUnnestFieldId();
MaterializedField thisField = incoming.getSchema().getColumn(unnestTypedFieldId.getFieldIds()[0]);
MaterializedField prevField = unnestFieldMetadata;
@@ -430,12 +422,14 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
memoryManager.getAvgOutputRowWidth(), memoryManager.getTotalOutputRecords());
}
- private TypedFieldId checkAndGetUnnestFieldId() throws SchemaChangeException {
+ private TypedFieldId checkAndGetUnnestFieldId() {
TypedFieldId fieldId = incoming.getValueVectorId(popConfig.getColumn());
if (fieldId == null) {
- throw new SchemaChangeException(String.format("Unnest column %s not found inside the incoming record batch. " +
- "This may happen if a wrong Unnest column name is used in the query. Please rerun query after fixing that.",
- popConfig.getColumn()));
+ throw UserException.schemaChangeError(null)
+ .message(String.format("Unnest column %s not found inside the incoming record batch. " +
+ "This may happen if a wrong Unnest column name is used in the query. Please rerun query after fixing that.",
+ popConfig.getColumn()))
+ .build(logger);
}
return fieldId;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index f6304aa..b94c551 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -22,9 +22,9 @@ import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Iterator;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.ExchangeFragmentContext;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
@@ -150,7 +150,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
return batchLoader.getValueAccessorById(clazz, ids);
}
- private RawFragmentBatch getNextBatch() throws IOException {
+ private RawFragmentBatch getNextBatch() {
try {
injector.injectInterruptiblePause(context.getExecutionControls(), "waiting-for-data", logger);
return fragProvider.getNext();
@@ -161,6 +161,10 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
Thread.currentThread().interrupt();
return null;
+ } catch (IOException e) {
+ throw UserException.dataReadError(e)
+ .addContext("Failure when reading incoming batch")
+ .build(logger);
}
}
@@ -215,13 +219,6 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
lastOutcome = IterOutcome.OK;
}
return lastOutcome;
- } catch (SchemaChangeException | IOException ex) {
- context.getExecutorState().fail(ex);
- lastOutcome = IterOutcome.STOP;
- return lastOutcome;
- } catch (Exception e) {
- lastOutcome = IterOutcome.STOP;
- throw e;
} finally {
stats.stopProcessing();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
index 3562e9a..66e8b01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
@@ -23,7 +23,6 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.UnpivotMaps;
@@ -116,38 +115,19 @@ public class UnpivotMapsRecordBatch extends AbstractSingleRecordBatch<UnpivotMap
case STOP:
return upStream;
case OK_NEW_SCHEMA:
- if (first) {
- first = false;
- }
- try {
- if (!setupNewSchema()) {
- upStream = IterOutcome.OK;
- } else {
- return upStream;
- }
- } catch (SchemaChangeException ex) {
- kill(false);
- logger.error("Failure during query", ex);
- context.getExecutorState().fail(ex);
- return IterOutcome.STOP;
- }
- //fall through
+ first = false;
+ setupNewSchema();
+ return upStream;
+
case OK:
assert first == false : "First batch should be OK_NEW_SCHEMA";
- try {
- container.zeroVectors();
- IterOutcome out = doWork();
- // Preserve OK_NEW_SCHEMA unless doWork() runs into an issue
- if (out != IterOutcome.OK) {
- upStream = out;
- }
- } catch (Exception ex) {
- kill(false);
- logger.error("Failure during query", ex);
- context.getExecutorState().fail(ex);
- return IterOutcome.STOP;
+ container.zeroVectors();
+ IterOutcome out = doWork();
+ // Preserve OK_NEW_SCHEMA unless doWork() runs into an issue
+ if (out != IterOutcome.OK) {
+ upStream = out;
}
- return upStream;
+ return upStream;
default:
throw new UnsupportedOperationException("Unsupported upstream state " + upStream);
}
@@ -269,7 +249,7 @@ public class UnpivotMapsRecordBatch extends AbstractSingleRecordBatch<UnpivotMap
}
@Override
- protected boolean setupNewSchema() throws SchemaChangeException {
+ protected boolean setupNewSchema() {
container.clear();
buildKeyList();
buildOutputContainer();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
index 5288776..624b14e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java
@@ -21,6 +21,7 @@ import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.config.WindowPOP;
+import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
@@ -34,9 +35,10 @@ import java.util.List;
/**
- * WindowFramer implementation that supports the FRAME clause.
- * <br>According to the SQL specification, FIRST_VALUE, LAST_VALUE and all aggregate functions support the FRAME clause.
- * This class will handle such functions even if the FRAME clause is not present.
+ * WindowFramer implementation that supports the FRAME clause. <br>
+ * According to the SQL specification, FIRST_VALUE, LAST_VALUE and all aggregate
+ * functions support the FRAME clause. This class will handle such functions
+ * even if the FRAME clause is not present.
*/
public abstract class FrameSupportTemplate implements WindowFramer {
@@ -90,7 +92,7 @@ public abstract class FrameSupportTemplate implements WindowFramer {
* processes all rows of the first batch.
*/
@Override
- public void doWork() throws DrillException {
+ public void doWork() throws SchemaChangeException {
int currentRow = 0;
this.current = batches.get(0);
@@ -144,7 +146,7 @@ public abstract class FrameSupportTemplate implements WindowFramer {
* @return index of next unprocessed row
* @throws DrillException if it can't write into the container
*/
- private int processPartition(final int currentRow) throws DrillException {
+ private int processPartition(final int currentRow) throws SchemaChangeException {
logger.trace("{} rows remaining to process, currentRow: {}, outputCount: {}", remainingRows, currentRow, outputCount);
setupWriteFirstValue(internal, container);
@@ -156,7 +158,7 @@ public abstract class FrameSupportTemplate implements WindowFramer {
}
}
- private int processROWS(int row) throws DrillException {
+ private int processROWS(int row) throws SchemaChangeException {
//TODO (DRILL-4413) we only need to call these once per batch
setupEvaluatePeer(current, container);
setupReadLastValue(current, container);
@@ -175,7 +177,7 @@ public abstract class FrameSupportTemplate implements WindowFramer {
return row;
}
- private int processRANGE(int row) throws DrillException {
+ private int processRANGE(int row) throws SchemaChangeException {
while (row < outputCount && !isPartitionDone()) {
if (remainingPeers == 0) {
// because all peer rows share the same frame, we only need to compute and aggregate the frame once
@@ -199,8 +201,10 @@ public abstract class FrameSupportTemplate implements WindowFramer {
}
/**
- * updates partition's length after computing the number of rows for the current the partition starting at the specified
- * row of the first batch. If !requiresFullPartition, this method will only count the rows in the current batch
+ * Updates partition's length after computing the number of rows for the
+ * current the partition starting at the specified row of the first batch. If
+ * !requiresFullPartition, this method will only count the rows in the current
+ * batch
*/
private void updatePartitionSize(final int start) {
logger.trace("compute partition size starting from {} on {} batches", start, batches.size());
@@ -245,12 +249,11 @@ public abstract class FrameSupportTemplate implements WindowFramer {
}
/**
- * aggregates all peer rows of current row
+ * Aggregates all peer rows of current row
* @param start starting row of the current frame
* @return num peer rows for current row
- * @throws SchemaChangeException
*/
- private long aggregatePeers(final int start) throws SchemaChangeException {
+ private long aggregatePeers(final int start) {
logger.trace("aggregating rows starting from {}", start);
final boolean unboundedFollowing = popConfig.getEnd().isUnbounded();
@@ -260,7 +263,11 @@ public abstract class FrameSupportTemplate implements WindowFramer {
// a single frame can include rows from multiple batches
// start processing first batch and, if necessary, move to next batches
for (WindowDataBatch batch : batches) {
- setupEvaluatePeer(batch, container);
+ try {
+ setupEvaluatePeer(batch, container);
+ } catch (SchemaChangeException e) {
+ throw AbstractRecordBatch.schemaChangeException(e, "Window", logger);
+ }
final int recordCount = batch.getRecordCount();
// for every remaining row in the partition, count it if it's a peer row
@@ -281,7 +288,11 @@ public abstract class FrameSupportTemplate implements WindowFramer {
}
}
- setupReadLastValue(last, container);
+ try {
+ setupReadLastValue(last, container);
+ } catch (SchemaChangeException e) {
+ throw AbstractRecordBatch.schemaChangeException(e, "Window", logger);
+ }
return length;
}
@@ -354,6 +365,7 @@ public abstract class FrameSupportTemplate implements WindowFramer {
* @param b2 batch for second row
* @return true if the rows are in the same partition
*/
+ @Override
public abstract boolean isSamePartition(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
@Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
@@ -366,6 +378,7 @@ public abstract class FrameSupportTemplate implements WindowFramer {
* @param b2 batch for second row
* @return true if the rows are in the same partition
*/
+ @Override
public abstract boolean isPeer(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
@Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
index a759399..61c8070 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
@@ -17,10 +17,10 @@
*/
package org.apache.drill.exec.physical.impl.window;
-import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.config.WindowPOP;
+import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
@@ -80,10 +80,10 @@ public abstract class NoFrameSupportTemplate implements WindowFramer {
}
/**
- * processes all rows of the first batch.
+ * Processes all rows of the first batch.
*/
@Override
- public void doWork() throws DrillException {
+ public void doWork() {
int currentRow = 0;
current = batches.get(0);
outputCount = current.getRecordCount();
@@ -103,17 +103,25 @@ public abstract class NoFrameSupportTemplate implements WindowFramer {
newPartition(current, currentRow);
}
- currentRow = processPartition(currentRow);
+ try {
+ currentRow = processPartition(currentRow);
+ } catch (SchemaChangeException e) {
+ throw AbstractRecordBatch.schemaChangeException(e, "Window", logger);
+ }
if (partition.isDone()) {
cleanPartition();
}
}
}
- private void newPartition(WindowDataBatch current, int currentRow) throws SchemaChangeException {
+ private void newPartition(WindowDataBatch current, int currentRow) {
partition = new Partition();
updatePartitionSize(partition, currentRow);
- setupPartition(current, container);
+ try {
+ setupPartition(current, container);
+ } catch (SchemaChangeException e) {
+ throw AbstractRecordBatch.schemaChangeException(e, "Window", logger);
+ }
}
private void cleanPartition() {
@@ -138,10 +146,9 @@ public abstract class NoFrameSupportTemplate implements WindowFramer {
* @param currentRow
* first unprocessed row
* @return index of next unprocessed row
- * @throws DrillException
- * if it can't write into the container
+ * @throws SchemaChangeException
*/
- private int processPartition(int currentRow) throws DrillException {
+ private int processPartition(int currentRow) throws SchemaChangeException {
logger.trace("process partition {}, currentRow: {}, outputCount: {}", partition, currentRow, outputCount);
setupCopyNext(current, container);
@@ -202,7 +209,7 @@ public abstract class NoFrameSupportTemplate implements WindowFramer {
}
}
- private void processRow(int row) throws DrillException {
+ private void processRow(int row) throws SchemaChangeException {
if (partition.isFrameDone()) {
// because all peer rows share the same frame, we only need to compute and aggregate the frame once
long peers = countPeers(row);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index 496c776..ba8c06b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -17,12 +17,10 @@
*/
package org.apache.drill.exec.physical.impl.window;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
@@ -31,7 +29,6 @@ import org.apache.drill.common.logical.data.Order;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.compile.sig.GeneratorMapping;
import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
@@ -145,13 +142,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
}
// process first saved batch, then release it
- try {
- doWork();
- } catch (DrillException e) {
- context.getExecutorState().fail(e);
- cleanup();
- return IterOutcome.STOP;
- }
+ doWork();
if (state == BatchState.FIRST) {
state = BatchState.NOT_FIRST;
@@ -160,7 +151,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
return IterOutcome.OK;
}
- private void doWork() throws DrillException {
+ private void doWork() {
WindowDataBatch current = batches.get(0);
int recordCount = current.getRecordCount();
@@ -170,8 +161,12 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
// allocate outgoing vectors
container.allocateNew();
- for (WindowFramer framer : framers) {
- framer.doWork();
+ try {
+ for (WindowFramer framer : framers) {
+ framer.doWork();
+ }
+ } catch (SchemaChangeException e) {
+ throw schemaChangeException(e, logger);
}
// transfer "non aggregated" vectors
@@ -225,7 +220,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
}
@Override
- protected void buildSchema() throws SchemaChangeException {
+ protected void buildSchema() {
logger.trace("buildSchema()");
IterOutcome outcome = next(incoming);
switch (outcome) {
@@ -242,8 +237,8 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
try {
createFramers(incoming);
- } catch (IOException | ClassTransformationException e) {
- throw new SchemaChangeException("Exception when creating the schema", e);
+ } catch (SchemaChangeException e) {
+ throw schemaChangeException(e, logger);
}
if (incoming.getRecordCount() > 0) {
@@ -251,7 +246,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
}
}
- private void createFramers(VectorAccessible batch) throws SchemaChangeException, IOException, ClassTransformationException {
+ private void createFramers(VectorAccessible batch) throws SchemaChangeException {
assert framers == null : "createFramer should only be called once";
logger.trace("creating framer(s)");
@@ -321,7 +316,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
}
private WindowFramer generateFramer(List<LogicalExpression> keyExprs, List<LogicalExpression> orderExprs,
- List<WindowFunction> functions, boolean useCustomFrame) throws IOException, ClassTransformationException {
+ List<WindowFunction> functions, boolean useCustomFrame) {
TemplateClassDefinition<WindowFramer> definition = useCustomFrame ?
WindowFramer.FRAME_TEMPLATE_DEFINITION : WindowFramer.NOFRAME_TEMPLATE_DEFINITION;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
index 4bb3d38..7552a10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
@@ -40,9 +40,10 @@ public interface WindowFramer {
/**
* process the inner batch and write the aggregated values in the container
+ * @throws SchemaChangeException
* @throws DrillException
*/
- void doWork() throws DrillException;
+ void doWork() throws SchemaChangeException;
/**
* @return number rows processed in last batch
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index f05b9f2..3a54c5f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.record;
import java.util.Iterator;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ExecConstants;
@@ -178,12 +177,6 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
break;
}
return lastOutcome;
- } catch (SchemaChangeException e) {
- lastOutcome = IterOutcome.STOP;
- throw new DrillRuntimeException(e);
- } catch (Exception e) {
- lastOutcome = IterOutcome.STOP;
- throw e;
} finally {
stats.stopProcessing();
}
@@ -200,8 +193,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
}
}
- protected void buildSchema() throws SchemaChangeException {
- }
+ protected void buildSchema() { }
@Override
public void kill(boolean sendUpstream) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
index d9b5f4b..8341b95 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
@@ -24,16 +24,19 @@ import org.apache.drill.exec.physical.base.LateralContract;
import org.apache.drill.exec.physical.base.PhysicalOperator;
/**
- * Implements AbstractUnaryRecodBatch for operators that do not have an incoming record batch available at creation
- * time; the input is typically set up a few steps after creation. Table functions and operators like Unnest that
- * require input before they can produce output fall into this category.
- * Table functions can be associated with a Lateral operator in which case they simultaneously operate on the
- * same row as the Lateral operator. In this case the LateralContract member is not null and the table function uses the
- * lateral contract to keep in sync with the Lateral operator.
+ * Implements AbstractUnaryRecodBatch for operators that do not have an incoming
+ * record batch available at creation time; the input is typically set up a few
+ * steps after creation. Table functions and operators like Unnest that require
+ * input before they can produce output fall into this category. Table functions
+ * can be associated with a Lateral operator in which case they simultaneously
+ * operate on the same row as the Lateral operator. In this case the
+ * LateralContract member is not null and the table function uses the lateral
+ * contract to keep in sync with the Lateral operator.
+ *
* @param <T>
*/
public abstract class AbstractTableFunctionRecordBatch<T extends PhysicalOperator> extends
- AbstractUnaryRecordBatch<T> implements TableFunctionContract{
+ AbstractUnaryRecordBatch<T> implements TableFunctionContract {
protected RecordBatch incoming;
protected LateralContract lateral;
@@ -48,12 +51,14 @@ public abstract class AbstractTableFunctionRecordBatch<T extends PhysicalOperato
return incoming;
}
+ @Override
public void setIncoming(RecordBatch incoming) {
Preconditions.checkArgument(this.incoming == null, "Incoming is already set. setIncoming cannot be called "
+ "multiple times.");
this.incoming = incoming;
}
+ @Override
public void setIncoming(LateralContract incoming) {
setIncoming(incoming.getIncoming());
lateral = incoming;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
index 78acb10..5997a34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
@@ -18,16 +18,13 @@
package org.apache.drill.exec.record;
import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * The base class for operators that have a single input. The concrete implementations provide the
+ * Base class for operators that have a single input. The concrete implementations provide the
* input by implementing the getIncoming() method
* Known implementations: AbstractSingleRecordBatch and AbstractTableFunctionRecordBatch.
* @see org.apache.drill.exec.record.AbstractRecordBatch
@@ -36,7 +33,6 @@ import org.slf4j.LoggerFactory;
* @param <T>
*/
public abstract class AbstractUnaryRecordBatch<T extends PhysicalOperator> extends AbstractRecordBatch<T> {
- private static final Logger logger = LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass());
protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
private IterOutcome lastKnownOutcome;
@@ -98,11 +94,6 @@ public abstract class AbstractUnaryRecordBatch<T extends PhysicalOperator> exten
if (!setupNewSchema()) {
upstream = IterOutcome.OK;
}
- } catch (SchemaChangeException ex) {
- kill(false);
- logger.error("Failure during query", ex);
- context.getExecutorState().fail(ex);
- return IterOutcome.STOP;
} finally {
stats.stopSetup();
}
@@ -130,7 +121,7 @@ public abstract class AbstractUnaryRecordBatch<T extends PhysicalOperator> exten
}
}
- protected abstract boolean setupNewSchema() throws SchemaChangeException;
+ protected abstract boolean setupNewSchema();
protected abstract IterOutcome doWork();
/**
@@ -150,7 +141,7 @@ public abstract class AbstractUnaryRecordBatch<T extends PhysicalOperator> exten
*/
protected IterOutcome handleNullInput() {
container.buildSchema(SelectionVectorMode.NONE);
- container.setRecordCount(0);
+ container.setEmpty();
return IterOutcome.NONE;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 1f06710..064c601 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -75,7 +75,8 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
* @throws SchemaChangeException
* TODO: Clean: DRILL-2933 load(...) never actually throws SchemaChangeException.
*/
- public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeException {
+ @SuppressWarnings("resource")
+ public boolean load(RecordBatchDef def, DrillBuf buf) {
if (logger.isTraceEnabled()) {
logger.trace("Loading record batch with def {} and data {}", def, buf);
logger.trace("Load, ThreadID: {}\n{}", Thread.currentThread().getId(), new StackTrace());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
index c588f25..09200f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
@@ -31,8 +31,8 @@ public class SimpleRecordBatch implements RecordBatch {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleRecordBatch.class);
- private VectorContainer container;
- private FragmentContext context;
+ private final VectorContainer container;
+ private final FragmentContext context;
public SimpleRecordBatch(VectorContainer container, FragmentContext context) {
this.container = container;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleUtilities.java
index 12b9053..f8fcb3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleUtilities.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.record;
import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
/**
* VectorAccessible is an interface. Yet, several operations are done
@@ -37,6 +38,12 @@ public class VectorAccessibleUtilities {
}
}
+ public static void clear(Iterable<ValueVector> iter) {
+ for (final ValueVector v : iter) {
+ v.clear();
+ }
+ }
+
public static void setValueCount(VectorAccessible va, int count) {
for (VectorWrapper<?> w: va) {
w.getValueVector().getMutator().setValueCount(count);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index 4f4f88d..ffc2854 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -19,21 +19,27 @@ package org.apache.drill.exec.record.selection;
import io.netty.buffer.ByteBuf;
-import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.DeadBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SelectionVector4 implements AutoCloseable {
+ static final Logger logger = LoggerFactory.getLogger(SelectionVector4.class);
private ByteBuf data;
private int recordCount;
private int start;
private int length;
- public SelectionVector4(ByteBuf vector, int recordCount, int batchRecordCount) throws SchemaChangeException {
+ public SelectionVector4(ByteBuf vector, int recordCount, int batchRecordCount) {
if (recordCount > Integer.MAX_VALUE / 4) {
- throw new SchemaChangeException(String.format("Currently, Drill can only support allocations up to 2gb in size. " +
- "You requested an allocation of %d bytes.", recordCount * 4L));
+ throw UserException.internalError(null)
+ .message(String.format(
+ "Currently, Drill can only support allocations up to 2gb in size. " +
+ "Query requested an allocation of %d bytes.", recordCount * 4L))
+ .build(logger);
}
this.recordCount = recordCount;
this.start = 0;
@@ -43,8 +49,11 @@ public class SelectionVector4 implements AutoCloseable {
public SelectionVector4(BufferAllocator allocator, int recordCount) {
if (recordCount > Integer.MAX_VALUE / 4) {
- throw new IllegalStateException(String.format("Currently, Drill can only support allocations up to 2gb in size. " +
- "You requested an allocation of %d bytes.", recordCount * 4L));
+ throw UserException.internalError(null)
+ .message(String.format(
+ "Currently, Drill can only support allocations up to 2gb in size. " +
+ "Query requested an allocation of %d bytes.", recordCount * 4L))
+ .build(logger);
}
this.recordCount = recordCount;
this.start = 0;
@@ -82,23 +91,24 @@ public class SelectionVector4 implements AutoCloseable {
}
/**
- * Caution: This method shares the underlying buffer between this vector and the newly created one.
- * @param batchRecordCount this will be used when creating the new vector
+ * Caution: This method shares the underlying buffer between this vector and
+ * the newly created one.
+ *
+ * @param batchRecordCount
+ * this will be used when creating the new vector
* @return Newly created single batch SelectionVector4.
*/
public SelectionVector4 createNewWrapperCurrent(int batchRecordCount) {
- try {
- data.retain();
- final SelectionVector4 sv4 = new SelectionVector4(data, recordCount, batchRecordCount);
- sv4.start = this.start;
- return sv4;
- } catch (SchemaChangeException e) {
- throw new IllegalStateException("This shouldn't happen.");
- }
+ data.retain();
+ final SelectionVector4 sv4 = new SelectionVector4(data, recordCount, batchRecordCount);
+ sv4.start = this.start;
+ return sv4;
}
/**
- * Caution: This method shares the underlying buffer between this vector and the newly created one.
+ * Caution: This method shares the underlying buffer between this vector and
+ * the newly created one.
+ *
* @return Newly created single batch SelectionVector4.
*/
public SelectionVector4 createNewWrapperCurrent() {
@@ -106,19 +116,15 @@ public class SelectionVector4 implements AutoCloseable {
}
public boolean next() {
-// logger.debug("Next called. Start: {}, Length: {}, recordCount: " + recordCount, start, length);
-
if (!hasNext()) {
start = recordCount;
length = 0;
-// logger.debug("Setting count to zero.");
return false;
}
start = start + length;
int newEnd = Math.min(start + length, recordCount);
length = newEnd - start;
-// logger.debug("New start {}, new length {}", start, length);
return true;
}
@@ -142,8 +148,8 @@ public class SelectionVector4 implements AutoCloseable {
this.recordCount = fromSV4.getTotalCount();
this.length = fromSV4.getCount();
this.data = fromSV4.getData();
- // Need to retain the data buffer since if fromSV4 clears out the buffer it's not actually released unless the
- // copied SV4 has also released it
+ // Need to retain the data buffer since if fromSV4 clears out the buffer
+ // it's not actually released unless the copied SV4 has also released it
if (data != DeadBuf.DEAD_BUFFER) {
this.data.retain();
}
diff --git a/exec/java-exec/src/main/codegen/templates/StatisticsRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StatisticsRecordWriter.java
similarity index 74%
rename from exec/java-exec/src/main/codegen/templates/StatisticsRecordWriter.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/StatisticsRecordWriter.java
index 9c285d4..ee0bc2d 100644
--- a/exec/java-exec/src/main/codegen/templates/StatisticsRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StatisticsRecordWriter.java
@@ -15,24 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-<@pp.dropOutputFile />
-<@pp.changeOutputFile name="org/apache/drill/exec/store/StatisticsRecordWriter.java" />
-<#include "/@includes/license.ftl" />
-
package org.apache.drill.exec.store;
import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
-import org.apache.drill.exec.vector.complex.reader.FieldReader;
import java.io.IOException;
import java.util.Map;
-/*
- * This class is generated using freemarker and the ${.template_name} template.
- */
-
-/** StatisticsRecordWriter interface. */
public interface StatisticsRecordWriter extends StatisticsRecordCollector {
/**
@@ -41,14 +30,14 @@ public interface StatisticsRecordWriter extends StatisticsRecordCollector {
* @param writerOptions Contains key, value pair of settings.
* @throws IOException
*/
- void init(Map<String, String> writerOptions) throws IOException;
+ void init(Map<String, String> writerOptions);
/**
* Update the schema in RecordWriter. Called at least once before starting writing the records.
* @param batch
* @throws IOException
*/
- void updateSchema(VectorAccessible batch) throws IOException;
+ void updateSchema(VectorAccessible batch);
/**
* Check if the writer should start a new partition, and if so, start a new partition
@@ -66,6 +55,6 @@ public interface StatisticsRecordWriter extends StatisticsRecordCollector {
* @throws IOException
*/
void flushBlockingWriter() throws IOException;
- void abort() throws IOException;
- void cleanup() throws IOException;
-}
\ No newline at end of file
+ void abort();
+ void cleanup();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
index 4a3d431..b73fa76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
@@ -48,10 +48,8 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
private Path cleanUpLocation;
private String location;
- private boolean append;
private String prefix;
- private String fieldDelimiter;
private String extension;
private boolean useExtendedOutput;
@@ -62,10 +60,7 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
private final JsonFactory factory = new JsonFactory();
private final StorageStrategy storageStrategy;
- // Record write status
- private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
-
- private Configuration fsConf;
+ private final Configuration fsConf;
public JsonRecordWriter(StorageStrategy storageStrategy, Configuration fsConf) {
this.storageStrategy = storageStrategy == null ? StorageStrategy.DEFAULT : storageStrategy;
@@ -76,7 +71,6 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
public void init(Map<String, String> writerOptions) throws IOException {
this.location = writerOptions.get("location");
this.prefix = writerOptions.get("prefix");
- this.fieldDelimiter = writerOptions.get("separator");
this.extension = writerOptions.get("extension");
this.useExtendedOutput = Boolean.parseBoolean(writerOptions.get("extended"));
this.skipNullFields = Boolean.parseBoolean(writerOptions.get("skipnulls"));
@@ -244,13 +238,11 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
@Override
public void startRecord() throws IOException {
gen.writeStartObject();
- fRecordStarted = true;
}
@Override
public void endRecord() throws IOException {
gen.writeEndObject();
- fRecordStarted = false;
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonStatisticsRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonStatisticsRecordWriter.java
index 51cdbe2..8858e34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonStatisticsRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonStatisticsRecordWriter.java
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.store.StatisticsRecordWriter;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.planner.common.DrillStatsTable;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.store.JSONBaseStatisticsRecordWriter;
@@ -40,13 +41,13 @@ public class JsonStatisticsRecordWriter extends JSONBaseStatisticsRecordWriter i
private String prefix;
private String extension;
private FileSystem fs = null;
- private Configuration fsConf;
- private FormatPlugin formatPlugin;
+ private final Configuration fsConf;
+ private final FormatPlugin formatPlugin;
private Path fileName = null;
private long recordsWritten = -1;
- private StatisticsCollectorImpl statisticsCollector = new StatisticsCollectorImpl();
+ private final StatisticsCollectorImpl statisticsCollector = new StatisticsCollectorImpl();
public JsonStatisticsRecordWriter(Configuration fsConf, FormatPlugin formatPlugin) {
this.fsConf = fsConf;
@@ -54,7 +55,7 @@ public class JsonStatisticsRecordWriter extends JSONBaseStatisticsRecordWriter i
}
@Override
- public void init(Map<String, String> writerOptions) throws IOException {
+ public void init(Map<String, String> writerOptions) {
this.location = writerOptions.get("location");
this.prefix = writerOptions.get("prefix");
this.extension = writerOptions.get("extension");
@@ -70,8 +71,9 @@ public class JsonStatisticsRecordWriter extends JSONBaseStatisticsRecordWriter i
fs.delete(fileName, false);
}
} catch (IOException ex) {
- logger.error("Unable to delete tmp file (corrupt): " + fileName, ex);
- throw ex;
+ throw UserException.dataWriteError(ex)
+ .addContext("Unable to delete tmp statistics file", fileName)
+ .build(logger);
}
try {
// Delete the tmp file and .stats.drill on exit. After writing out the permanent file
@@ -81,8 +83,9 @@ public class JsonStatisticsRecordWriter extends JSONBaseStatisticsRecordWriter i
fs.deleteOnExit(new Path(location));
logger.debug("Created file: {}", fileName);
} catch (IOException ex) {
- logger.error("Unable to create file: " + fileName, ex);
- throw ex;
+ throw UserException.dataWriteError(ex)
+ .addContext("Unable to create stistics file", fileName)
+ .build(logger);
}
}
@@ -186,13 +189,13 @@ public class JsonStatisticsRecordWriter extends JSONBaseStatisticsRecordWriter i
}
@Override
- public void abort() throws IOException {
+ public void abort() {
// Invoke cleanup to clear any .tmp files and/or empty statistics directory
cleanup();
}
@Override
- public void cleanup() throws IOException {
+ public void cleanup() {
Path permFileName = new Path(location, prefix + "." + extension);
try {
// Remove the .tmp file, if any
@@ -206,8 +209,8 @@ public class JsonStatisticsRecordWriter extends JSONBaseStatisticsRecordWriter i
logger.debug("Deleted directory: {}", location);
}
} catch (IOException ex) {
- logger.error("Unable to delete tmp file: " + fileName, ex);
- throw ex;
+ // Warn but continue
+ logger.warn("Unable to delete tmp satistics file: " + fileName, ex);
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 161fa83..3b9ed1f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -55,9 +55,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.drill.exec.server.FailureUtils.EXIT_CODE_HEAP_OOM;
/**
- * <h2>Overview</h2>
- * <p>
- * Responsible for running a single fragment on a single Drillbit.
+ * Runs a single fragment on a single Drillbit.
* Listens/responds to status request and cancellation messages.
* </p>
* <h2>Theory of Operation</h2>
@@ -143,6 +141,7 @@ public class FragmentExecutor implements Runnable {
private volatile RootExec root;
private final AtomicReference<FragmentState> fragmentState = new AtomicReference<>(FragmentState.AWAITING_ALLOCATION);
+
/**
* Holds all of the messages sent by downstream receivers that have finished. The {@link FragmentExecutor#run()} thread reads from this queue and passes the
* finished messages to the fragment's {@link RootExec} via the {@link RootExec#receivingFragmentFinished(FragmentHandle)} method.
@@ -209,11 +208,11 @@ public class FragmentExecutor implements Runnable {
* before this check. This caused a concurrent modification exception as the list of operator
* stats is iterated over while collecting info, and added to while building the operator tree.
*/
- if (fragmentState.get() != FragmentState.RUNNING) {
+ if (fragmentState.get() == FragmentState.RUNNING) {
+ return statusReporter.getStatus(FragmentState.RUNNING);
+ } else {
return null;
}
-
- return statusReporter.getStatus(FragmentState.RUNNING);
}
/**
@@ -237,32 +236,31 @@ public class FragmentExecutor implements Runnable {
}
private void cleanup(FragmentState state) {
- if (root != null && fragmentState.get() == FragmentState.FAILED) {
- root.dumpBatches();
- }
-
closeOutResources();
updateState(state);
// send the final state of the fragment. only the main execution thread can send the final state and it can
// only be sent once.
sendFinalState();
-
}
/**
- * Resume all the pauses within the current context. Note that this method will be called from threads *other* than
- * the one running this runnable(). Also, this method can be called multiple times.
+ * Resume all the pauses within the current context. Note that this method
+ * will be called from threads *other* than the one running this runnable().
+ * Also, this method can be called multiple times.
*/
public synchronized void unpause() {
fragmentContext.getExecutionControls().unpauseAll();
}
/**
- * Inform this fragment that one of its downstream partners no longer needs additional records. This is most commonly
- * called in the case that a limit query is executed.
+ * Inform this fragment that one of its downstream partners no longer needs
+ * additional records. This is most commonly called in the case that a limit
+ * query is executed.
*
- * @param handle The downstream FragmentHandle of the Fragment that needs no more records from this Fragment.
+ * @param handle
+ * The downstream FragmentHandle of the Fragment that needs no more
+ * records from this Fragment.
*/
public void receivingFragmentFinished(final FragmentHandle handle) {
eventProcessor.receiverFinished(handle);
@@ -283,7 +281,6 @@ public class FragmentExecutor implements Runnable {
final String newThreadName = QueryIdHelper.getExecutorThreadName(fragmentHandle);
try {
-
myThread.setName(newThreadName);
// if we didn't get the root operator when the executor was created, create it now.
@@ -337,6 +334,7 @@ public class FragmentExecutor implements Runnable {
// Ignore: indicates query cancelled by this executor
} catch (OutOfMemoryError | OutOfMemoryException e) {
if (FailureUtils.isDirectMemoryOOM(e)) {
+ root.dumpBatches(e);
fail(UserException.memoryError(e).build(logger));
} else {
// we have a heap out of memory error. The JVM is unstable, exit.
@@ -346,6 +344,7 @@ public class FragmentExecutor implements Runnable {
// Swallow interrupted exceptions since we intentionally interrupt the root when cancelling a query
logger.trace("Interrupted root: {}", root, e);
} catch (Throwable t) {
+ root.dumpBatches(t);
fail(t);
} finally {
@@ -363,7 +362,6 @@ public class FragmentExecutor implements Runnable {
clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);
myThread.setName(originalThreadName);
-
}
}
@@ -400,7 +398,6 @@ public class FragmentExecutor implements Runnable {
statusReporter.close();
}
-
private void closeOutResources() {
// first close the operators and release all memory.
@@ -416,7 +413,6 @@ public class FragmentExecutor implements Runnable {
// then close the fragment context.
fragmentContext.close();
-
}
private void warnStateChange(final FragmentState current, final FragmentState target) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 2fc9158..054cdee 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -126,7 +126,7 @@ public class MockRecordBatch implements CloseableRecordBatch {
@Override
public void close() {
container.clear();
- container.setRecordCount(0);
+ container.setEmpty();
currentContainerIndex = 0;
currentOutcomeIndex = 0;
if (sv2 != null) {
@@ -291,8 +291,7 @@ public class MockRecordBatch implements CloseableRecordBatch {
}
@Override
- public void dump() {
- }
+ public void dump() { }
public static class Builder {
private final List<RowSet> rowSets = new ArrayList<>();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index 68cf18f..8f0d677 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -113,8 +113,8 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector> {
}
@Override
- public void dumpBatches() {
- screenRoot.dumpBatches();
+ public void dumpBatches(Throwable t) {
+ screenRoot.dumpBatches(t);
}
@Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestStackAnalyzer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestStackAnalyzer.java
new file mode 100644
index 0000000..fd8f2ac
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestStackAnalyzer.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Test;
+
+/**
+ * Test the function which finds the leaf-most operator within
+ * an exception call stack. Does the tests using dummy classes
+ * (which is why the stack analyzer function is parameterized.)
+ */
+public class TestStackAnalyzer {
+
+ private static class OperA {
+ public void throwNow() {
+ throw new RuntimeException();
+ }
+
+ public void throwIndirect() {
+ throwNow();
+ }
+
+ public void throwViaB(OperB b) {
+ b.throwIndirect();
+ }
+
+ public void throwAfterB(OperB b) {
+ new RandomC().throwAfterB(b);
+ }
+ }
+
+ private static class OperB {
+ public void throwNow() {
+ throw new RuntimeException();
+ }
+
+ public void throwIndirect() {
+ throwNow();
+ }
+
+ public void throwAfterB() {
+ new RandomC().throwNow();
+ }
+ }
+
+ private static class RandomC {
+ public void throwNow() {
+ throw new RuntimeException();
+ }
+
+ public void throwAfterB(OperB b) {
+ b.throwAfterB();
+ }
+ }
+
+ @Test
+ public void testEmptyStack() {
+ try {
+ throw new RuntimeException();
+ } catch (RuntimeException e) {
+ assertNull(BaseRootExec.findLeaf(Collections.emptyList(), e));
+ }
+ }
+
+ @Test
+ public void testOneLevel() {
+ OperA a = new OperA();
+ try {
+ a.throwNow();
+ } catch (RuntimeException e) {
+ List<Object> ops = Collections.singletonList(a);
+ assertSame(a, BaseRootExec.findLeaf(ops, e));
+ }
+ }
+
+ @Test
+ public void testOneLevelTwoDeep() {
+ OperA a = new OperA();
+ try {
+ a.throwIndirect();
+ } catch (RuntimeException e) {
+ List<Object> ops = Collections.singletonList(a);
+ assertSame(a, BaseRootExec.findLeaf(ops, e));
+ }
+ }
+
+ @Test
+ public void testTwoLevels() {
+ OperA a = new OperA();
+ OperB b = new OperB();
+ try {
+ a.throwViaB(b);
+ } catch (RuntimeException e) {
+ List<Object> ops = Arrays.asList(a, b);
+ assertSame(b, BaseRootExec.findLeaf(ops, e));
+ }
+ }
+
+ @Test
+ public void testTwoLevelsWithExtra() {
+ OperA a = new OperA();
+ OperB b = new OperB();
+ try {
+ a.throwAfterB(b);
+ } catch (RuntimeException e) {
+ List<Object> ops = Arrays.asList(a, b);
+ assertSame(b, BaseRootExec.findLeaf(ops, e));
+ }
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
index 7ef94f7..9e47915 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
@@ -140,7 +140,7 @@ public class TestPartitionSender extends PlanTestBase {
final SelectionVector4 sv = Mockito.mock(SelectionVector4.class, "SelectionVector4");
Mockito.when(sv.getCount()).thenReturn(100);
Mockito.when(sv.getTotalCount()).thenReturn(100);
- for (int i = 0; i < 100; i++ ) {
+ for (int i = 0; i < 100; i++) {
Mockito.when(sv.get(i)).thenReturn(i);
}
@@ -165,8 +165,8 @@ public class TestPartitionSender extends PlanTestBase {
// get HashToRandomExchange physical operator
HashToRandomExchange hashToRandomExchange = null;
- for ( PhysicalOperator operator : operators) {
- if ( operator instanceof HashToRandomExchange) {
+ for (PhysicalOperator operator : operators) {
+ if (operator instanceof HashToRandomExchange) {
hashToRandomExchange = (HashToRandomExchange) operator;
break;
}
@@ -241,7 +241,7 @@ public class TestPartitionSender extends PlanTestBase {
final int actualThreads = DRILLBITS_COUNT > expectedThreadsCount ? expectedThreadsCount : DRILLBITS_COUNT;
assertEquals("Number of partitioners", actualThreads, partitioners.size());
- for ( int i = 0; i < mfEndPoints.size(); i++) {
+ for (int i = 0; i < mfEndPoints.size(); i++) {
assertNotNull("PartitionOutgoingBatch", partDecor.getOutgoingBatches(i));
}
@@ -249,10 +249,11 @@ public class TestPartitionSender extends PlanTestBase {
boolean isFirst = true;
int prevBatchCountSize = 0;
int batchCountSize = 0;
- for (Partitioner part : partitioners ) {
+ for (Partitioner part : partitioners) {
+ @SuppressWarnings("unchecked")
final List<PartitionOutgoingBatch> outBatch = (List<PartitionOutgoingBatch>) part.getOutgoingBatches();
batchCountSize = outBatch.size();
- if ( !isFirst ) {
+ if (!isFirst) {
assertTrue(Math.abs(batchCountSize - prevBatchCountSize) <= 1);
} else {
isFirst = false;
@@ -266,7 +267,7 @@ public class TestPartitionSender extends PlanTestBase {
} finally {
partionSenderRootExec.getStats().stopProcessing();
}
- if ( actualThreads == 1 ) {
+ if (actualThreads == 1) {
assertEquals("With single thread parent and child waitNanos should match", partitioners.get(0).getStats().getWaitNanos(), partionSenderRootExec.getStats().getWaitNanos());
}
@@ -274,11 +275,12 @@ public class TestPartitionSender extends PlanTestBase {
partitioners = partDecor.getPartitioners();
isFirst = true;
// since we have fake Nullvector distribution is skewed
- for (Partitioner part : partitioners ) {
+ for (Partitioner part : partitioners) {
+ @SuppressWarnings("unchecked")
final List<PartitionOutgoingBatch> outBatches = (List<PartitionOutgoingBatch>) part.getOutgoingBatches();
- for (PartitionOutgoingBatch partOutBatch : outBatches ) {
+ for (PartitionOutgoingBatch partOutBatch : outBatches) {
final int recordCount = ((VectorAccessible) partOutBatch).getRecordCount();
- if ( isFirst ) {
+ if (isFirst) {
assertEquals("RecordCount", 100, recordCount);
isFirst = false;
} else {
@@ -296,8 +298,8 @@ public class TestPartitionSender extends PlanTestBase {
final OperatorProfile.Builder oPBuilder = OperatorProfile.newBuilder();
partionSenderRootExec.getStats().addAllMetrics(oPBuilder);
final List<MetricValue> metrics = oPBuilder.getMetricList();
- for ( MetricValue metric : metrics) {
- if ( Metric.BYTES_SENT.metricId() == metric.getMetricId() ) {
+ for (MetricValue metric : metrics) {
+ if (Metric.BYTES_SENT.metricId() == metric.getMetricId()) {
assertEquals("Should add metricValue irrespective of exception", 5*actualThreads, metric.getLongValue());
}
if (Metric.SENDING_THREADS_COUNT.metricId() == metric.getMetricId()) {
@@ -327,7 +329,7 @@ public class TestPartitionSender extends PlanTestBase {
int numberPartitions;
int k = 0;
final Random rand = new Random();
- while ( k < 1000 ) {
+ while (k < 1000) {
outGoingBatchCount = rand.nextInt(1000)+1;
numberPartitions = rand.nextInt(32)+1;
final int actualPartitions = outGoingBatchCount > numberPartitions ? numberPartitions : outGoingBatchCount;
@@ -339,11 +341,11 @@ public class TestPartitionSender extends PlanTestBase {
for (int i = 0; i < actualPartitions; i++) {
startIndex = endIndex;
endIndex = startIndex + divisor;
- if ( i < longTail ) {
+ if (i < longTail) {
endIndex++;
}
}
- assertTrue("endIndex can not be > outGoingBatchCount", endIndex == outGoingBatchCount );
+ assertTrue("endIndex can not be > outGoingBatchCount", endIndex == outGoingBatchCount);
k++;
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index b3e1d8e..0baf0a0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -62,14 +62,12 @@ import static org.junit.Assert.fail;
@Category(OperatorTest.class)
public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
-
// Operator Context for mock batch
public static OperatorContext operatorContext;
public static PhysicalOperator mockPopConfig;
public static LateralJoinPOP ljPopConfig;
-
@BeforeClass public static void setUpBeforeClass() throws Exception {
mockPopConfig = new MockStorePOP(null);
ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
@@ -112,7 +110,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
}
-
}
@Test
@@ -146,7 +143,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
}
-
}
@Test
@@ -167,7 +163,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
}
-
}
@Test
@@ -198,7 +193,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
}
-
}
@Test
@@ -246,7 +240,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
}
-
}
@Test
@@ -295,7 +288,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
}
-
}
private void testUnnestBatchSizing(int inputBatchSize, int limitOutputBatchSize,
@@ -437,7 +429,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
} finally {
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, outputBatchSize);
}
-
}
@Test
@@ -495,7 +486,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
} finally {
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, outputBatchSize);
}
-
}
@Test
@@ -528,10 +518,8 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
}
-
}
-
// test unnest for various input conditions without invoking kill
private <T> void testUnnest(
TupleMetadata[] incomingSchemas,
@@ -559,7 +547,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
ArrayList<RowSet.SingleRowSet> rowSets = new ArrayList<>();
int rowNumber = 0;
int batchNum = 0;
- for ( Object[] recordBatch : data) {
+ for (Object[] recordBatch : data) {
RowSetBuilder rowSetBuilder = fixture.rowSetBuilder(incomingSchemas[batchNum]);
for ( Object rowData : recordBatch) {
rowSetBuilder.addRow(++rowNumber, rowData);
@@ -604,28 +592,29 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
// Simulate the pipeline by calling next on the incoming
- // results is an array ot batches, each batch being an array of output vectors.
+ // results is an array of batches, each batch being an array of output vectors.
List<List<ValueVector> > resultList = new ArrayList<>();
List<List<ValueVector> > results = null;
int batchesProcessed = 0;
try{
- try {
- while (!isTerminal(lateralJoinBatch.next())) {
- if (lateralJoinBatch.getRecordCount() > 0) {
- addBatchToResults(resultList, lateralJoinBatch);
- }
- batchesProcessed++;
- if (batchesProcessed == execKill) {
- lateralJoinBatch.getContext().getExecutorState().fail(new DrillException("Testing failure of execution."));
- lateralJoinBatch.kill(true);
+ try {
+ while (!isTerminal(lateralJoinBatch.next())) {
+ if (lateralJoinBatch.getRecordCount() > 0) {
+ addBatchToResults(resultList, lateralJoinBatch);
+ }
+ batchesProcessed++;
+ if (batchesProcessed == execKill) {
+ // Errors are reported by throwing an exception.
+ // Simulate by skipping out of the loop
+ break;
+ }
+ // else nothing to do
}
- // else nothing to do
+ } catch (UserException e) {
+ throw e;
+ } catch (Exception e) {
+ fail(e.getMessage());
}
- } catch (UserException e) {
- throw e;
- } catch (Exception e) {
- throw new Exception ("Test failed to execute lateralJoinBatch.next() because: " + e.getMessage());
- }
// Check results against baseline
results = resultList;
@@ -633,7 +622,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
int batchIndex = 0;
int vectorIndex = 0;
//int valueIndex = 0;
- for ( List<ValueVector> batch: results) {
+ for (List<ValueVector> batch: results) {
int vectorCount= batch.size();
int expectedVectorCount = (excludeUnnestColumn) ? 0 : 1;
expectedVectorCount += baseline[batchIndex].length;
@@ -698,7 +687,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
rowSet.clear();
}
}
-
}
/**
@@ -849,8 +837,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
* @param <T>
* @throws Exception
*/
-
-
private <T> void testNestedUnnest( TupleMetadata[] incomingSchemas,
RecordBatch.IterOutcome[] iterOutcomes,
int execKill, // number of batches after which to kill the execution (!)
@@ -1016,7 +1002,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
rowSet.clear();
}
}
-
}
@Test
@@ -1037,8 +1022,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
}
-
}
-
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 83627c6..206221d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -47,7 +47,6 @@ import org.apache.drill.exec.ZookeeperHelper;
import org.apache.drill.exec.ZookeeperTestUtil;
import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.RootAllocatorFactory;
import org.apache.drill.exec.physical.impl.ScreenCreator;
@@ -249,13 +248,9 @@ public class TestDrillbitResilience extends DrillTest {
public void rowArrived(final QueryDataBatch queryResultBatch) {
// load the single record
final QueryData queryData = queryResultBatch.getHeader();
- try {
- loader.load(queryData.getDef(), queryResultBatch.getData());
- // TODO: Clean: DRILL-2933: That load(...) no longer throws
- // SchemaChangeException, so check/clean catch clause below.
- } catch (final SchemaChangeException e) {
- fail(e.toString());
- }
+ // TODO: Clean: DRILL-2933: That load(...) no longer throws
+ // SchemaChangeException.
+ loader.load(queryData.getDef(), queryResultBatch.getData());
assertEquals(1, loader.getRecordCount());
// there should only be one column
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
index 6d9be9b..c31807b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
@@ -48,7 +47,7 @@ public class ParquetResultListener implements UserResultsListener {
int count = 0;
int totalRecords;
- private boolean testValues;
+ private final boolean testValues;
private final BufferAllocator allocator;
int batchCounter = 1;
@@ -109,13 +108,7 @@ public class ParquetResultListener implements UserResultsListener {
count += result.getHeader().getRowCount();
boolean schemaChanged = false;
final RecordBatchLoader batchLoader = new RecordBatchLoader(allocator);
- try {
- schemaChanged = batchLoader.load(result.getHeader().getDef(), result.getData());
- // TODO: Clean: DRILL-2933: That load(...) no longer throws
- // SchemaChangeException, so check/clean catch clause below.
- } catch (SchemaChangeException e) {
- throw new RuntimeException(e);
- }
+ schemaChanged = batchLoader.load(result.getHeader().getDef(), result.getData());
// used to make sure each vector in the batch has the same number of records
int valueCount = batchLoader.getRecordCount();
@@ -124,7 +117,7 @@ public class ParquetResultListener implements UserResultsListener {
if (schemaChanged) {
} // do not believe any change is needed for when the schema changes, with the current mock scan use case
- for (final VectorWrapper vw : batchLoader) {
+ for (final VectorWrapper<?> vw : batchLoader) {
final ValueVector vv = vw.getValueVector();
currentField = props.fields.get(vv.getField().getName());
if (!valuesChecked.containsKey(vv.getField().getName())) {
@@ -210,7 +203,7 @@ public class ParquetResultListener implements UserResultsListener {
if (i % 50 == 0) {
final StringBuilder sb = new StringBuilder();
- for (VectorWrapper vw : batchLoader) {
+ for (VectorWrapper<?> vw : batchLoader) {
ValueVector v = vw.getValueVector();
sb.append(Strings.padStart(v.getField().getName(), 20, ' ') + " ");
}
@@ -220,7 +213,7 @@ public class ParquetResultListener implements UserResultsListener {
final StringBuilder sb = new StringBuilder();
- for (final VectorWrapper vw : batchLoader) {
+ for (final VectorWrapper<?> vw : batchLoader) {
final ValueVector v = vw.getValueVector();
Object o = v.getAccessor().getObject(i);
if (o instanceof byte[]) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
index 2fa8c75..a19fd74 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
@@ -63,11 +63,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * An object to encapsulate the options for a Drill unit test, as well as the execution methods to perform the tests and
- * validation of results.
- *
- * To construct an instance easily, look at the TestBuilder class. From an implementation of
- * the BaseTestQuery class, and instance of the builder is accessible through the testBuilder() method.
+ * Encapsulates the options for a Drill unit test, as well as the execution
+ * methods to perform the tests and validation of results.
+ * <p>
+ * To construct an instance easily, look at the TestBuilder class. From an
+ * implementation of the BaseTestQuery class, and instance of the builder is
+ * accessible through the testBuilder() method.
*/
public class DrillTestWrapper {
@@ -98,39 +99,39 @@ public class DrillTestWrapper {
// one case where the setup for the baseline is driven by the test query results, and this is implicit type enforcement
// for the baseline data. In this case there needs to be a call back into the TestBuilder once we know the type information
// from the test query.
- private TestBuilder testBuilder;
+ private final TestBuilder testBuilder;
/**
* Test query to run. Type of object depends on the {@link #queryType}
*/
- private Object query;
+ private final Object query;
// The type of query provided
- private UserBitShared.QueryType queryType;
+ private final UserBitShared.QueryType queryType;
// The type of query provided for the baseline
- private UserBitShared.QueryType baselineQueryType;
+ private final UserBitShared.QueryType baselineQueryType;
// should ordering be enforced in the baseline check
- private boolean ordered;
- private TestServices services;
+ private final boolean ordered;
+ private final TestServices services;
// queries to run before the baseline or test queries, can be used to set options
- private String baselineOptionSettingQueries;
- private String testOptionSettingQueries;
+ private final String baselineOptionSettingQueries;
+ private final String testOptionSettingQueries;
// allow approximate equality tests for number types
- private boolean approximateEquality;
+ private final boolean approximateEquality;
// tolerance for approximate equality tests defined as |Expected - Actual|/|Expected| <= Tolerance
- private double tolerance;
+ private final double tolerance;
// two different methods are available for comparing ordered results, the default reads all of the records
// into giant lists of objects, like one giant on-heap batch of 'vectors'
// this flag enables the other approach which iterates through a hyper batch for the test query results and baseline
// while this does work faster and use less memory, it can be harder to debug as all of the elements are not in a
// single list
- private boolean highPerformanceComparison;
+ private final boolean highPerformanceComparison;
// if the baseline is a single option test writers can provide the baseline values and columns
// without creating a file, these are provided to the builder in the baselineValues() and baselineColumns() methods
// and translated into a map in the builder
- private String[] baselineColumns;
- private List<Map<String, Object>> baselineRecords;
+ private final String[] baselineColumns;
+ private final List<Map<String, Object>> baselineRecords;
- private int expectedNumBatches;
- private int expectedNumRecords;
+ private final int expectedNumBatches;
+ private final int expectedNumRecords;
public DrillTestWrapper(TestBuilder testBuilder, TestServices services, Object query, QueryType queryType,
String baselineOptionSettingQueries, String testOptionSettingQueries,
@@ -312,11 +313,7 @@ public class DrillTestWrapper {
}
batchLoader.clear();
QueryDataBatch batch = dataBatches.get(index);
- try {
- batchLoader.load(batch.getHeader().getDef(), batch.getData());
- } catch (SchemaChangeException e) {
- throw new RuntimeException(e);
- }
+ batchLoader.load(batch.getHeader().getDef(), batch.getData());
return batchLoader;
}
@@ -421,6 +418,7 @@ public class DrillTestWrapper {
case FOUR_BYTE:
sv4 = loader.getSelectionVector4();
break;
+ default:
}
if (sv4 != null) {
for (int j = 0; j < sv4.getCount(); j++) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index 2352d0a..6b0641d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -67,21 +67,18 @@ import static org.junit.Assert.assertEquals;
* Builder for a Drill query. Provides all types of query formats,
* and a variety of ways to run the query.
*/
-
public class QueryBuilder {
/**
* Listener used to retrieve the query summary (only) asynchronously
* using a {@link QuerySummaryFuture}.
*/
-
public static class SummaryOnlyQueryEventListener implements UserResultsListener {
/**
* The future to be notified. Created here and returned by the
* query builder.
*/
-
private final QuerySummaryFuture future;
private QueryId queryId;
private int recordCount;
@@ -374,28 +371,24 @@ public class QueryBuilder {
// Unload the batch and convert to a row set.
RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
- try {
- loader.load(resultBatch.getHeader().getDef(), resultBatch.getData());
- resultBatch.release();
- VectorContainer container = loader.getContainer();
- container.setRecordCount(loader.getRecordCount());
-
- // Null results? Drill will return a single batch with no rows
- // and no columns even if the scan (or other) operator returns
- // no batches at all. For ease of testing, simply map this null
- // result set to a null output row set that says "nothing at all
- // was returned." Note that this is different than an empty result
- // set which has a schema, but no rows.
-
- if (container.getRecordCount() == 0 && container.getNumberOfColumns() == 0) {
- container.clear();
- return null;
- }
-
- return DirectRowSet.fromContainer(container);
- } catch (SchemaChangeException e) {
- throw new IllegalStateException(e);
+ loader.load(resultBatch.getHeader().getDef(), resultBatch.getData());
+ resultBatch.release();
+ VectorContainer container = loader.getContainer();
+ container.setRecordCount(loader.getRecordCount());
+
+ // Null results? Drill will return a single batch with no rows
+ // and no columns even if the scan (or other) operator returns
+ // no batches at all. For ease of testing, simply map this null
+ // result set to a null output row set that says "nothing at all
+ // was returned." Note that this is different than an empty result
+ // set which has a schema, but no rows.
+
+ if (container.getRecordCount() == 0 && container.getNumberOfColumns() == 0) {
+ container.clear();
+ return null;
}
+
+ return DirectRowSet.fromContainer(container);
}
public QueryRowSetIterator rowSetIterator() {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
index 03ae625..c8d8459 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
@@ -19,7 +19,6 @@ package org.apache.drill.test;
import java.util.Iterator;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.rowSet.DirectRowSet;
import org.apache.drill.exec.physical.rowSet.RowSetFormatter;
@@ -87,16 +86,12 @@ public class QueryRowSetIterator implements Iterator<DirectRowSet>, Iterable<Dir
// Unload the batch and convert to a row set.
final RecordBatchLoader loader = new RecordBatchLoader(allocator);
- try {
- loader.load(batch.getHeader().getDef(), batch.getData());
- batch.release();
- batch = null;
- VectorContainer container = loader.getContainer();
- container.setRecordCount(loader.getRecordCount());
- return DirectRowSet.fromContainer(container);
- } catch (SchemaChangeException e) {
- throw new IllegalStateException(e);
- }
+ loader.load(batch.getHeader().getDef(), batch.getData());
+ batch.release();
+ batch = null;
+ VectorContainer container = loader.getContainer();
+ container.setRecordCount(loader.getRecordCount());
+ return DirectRowSet.fromContainer(container);
}
public void printAll() {
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
index 68c944a..6a6aaf8 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
@@ -40,7 +40,6 @@ import org.apache.calcite.avatica.util.Cursor;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
@@ -88,13 +87,13 @@ public class DrillCursor implements Cursor {
// that the _query_ has _terminated_ (not necessarily _completing_
// normally), while some uses imply that it's some other state of the
// ResultListener. Some uses seem redundant.)
- volatile boolean completed = false;
+ volatile boolean completed;
/** Whether throttling of incoming data is active. */
- private final AtomicBoolean throttled = new AtomicBoolean( false );
+ private final AtomicBoolean throttled = new AtomicBoolean(false);
private volatile ConnectionThrottle throttle;
- private volatile boolean closed = false;
+ private volatile boolean closed;
private final CountDownLatch firstMessageReceived = new CountDownLatch(1);
@@ -102,7 +101,7 @@ public class DrillCursor implements Cursor {
Queues.newLinkedBlockingDeque();
private final DrillCursor parent;
- Stopwatch elapsedTimer = null;
+ Stopwatch elapsedTimer;
/**
* ...
@@ -111,11 +110,11 @@ public class DrillCursor implements Cursor {
* @param batchQueueThrottlingThreshold
* queue size threshold for throttling server
*/
- ResultsListener(DrillCursor parent, int batchQueueThrottlingThreshold ) {
+ ResultsListener(DrillCursor parent, int batchQueueThrottlingThreshold) {
this.parent = parent;
instanceId = nextInstanceId++;
this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold;
- logger.debug( "[#{}] Query listener created.", instanceId );
+ logger.debug("[#{}] Query listener created.", instanceId);
}
/**
@@ -123,9 +122,9 @@ public class DrillCursor implements Cursor {
* @param throttle the "throttlable" object to throttle
* @return true if actually started (wasn't throttling already)
*/
- private boolean startThrottlingIfNot( ConnectionThrottle throttle ) {
- final boolean started = throttled.compareAndSet( false, true );
- if ( started ) {
+ private boolean startThrottlingIfNot(ConnectionThrottle throttle) {
+ final boolean started = throttled.compareAndSet(false, true);
+ if (started) {
this.throttle = throttle;
throttle.setAutoRead(false);
}
@@ -137,8 +136,8 @@ public class DrillCursor implements Cursor {
* @return true if actually stopped (was throttling)
*/
private boolean stopThrottlingIfSo() {
- final boolean stopped = throttled.compareAndSet( true, false );
- if ( stopped ) {
+ final boolean stopped = throttled.compareAndSet(true, false);
+ if (stopped) {
throttle.setAutoRead(true);
throttle = null;
}
@@ -147,10 +146,10 @@ public class DrillCursor implements Cursor {
public void awaitFirstMessage() throws InterruptedException, SQLTimeoutException {
//Check if a non-zero timeout has been set
- if ( parent.timeoutInMilliseconds > 0 ) {
+ if (parent.timeoutInMilliseconds > 0) {
//Identifying remaining in milliseconds to maintain a granularity close to integer value of timeout
long timeToTimeout = parent.timeoutInMilliseconds - parent.elapsedTimer.elapsed(TimeUnit.MILLISECONDS);
- if ( timeToTimeout <= 0 || !firstMessageReceived.await(timeToTimeout, TimeUnit.MILLISECONDS)) {
+ if (timeToTimeout <= 0 || !firstMessageReceived.await(timeToTimeout, TimeUnit.MILLISECONDS)) {
throw new SqlTimeoutException(TimeUnit.MILLISECONDS.toSeconds(parent.timeoutInMilliseconds));
}
} else {
@@ -164,25 +163,25 @@ public class DrillCursor implements Cursor {
@Override
public void queryIdArrived(QueryId queryId) {
- logger.debug( "[#{}] Received query ID: {}.",
- instanceId, QueryIdHelper.getQueryId( queryId ) );
+ logger.debug("[#{}] Received query ID: {}.",
+ instanceId, QueryIdHelper.getQueryId(queryId));
this.queryId = queryId;
}
@Override
public void submissionFailed(UserException ex) {
- logger.debug( "Received query failure:", instanceId, ex );
+ logger.debug("Received query failure:", instanceId, ex);
this.executionFailureException = ex;
completed = true;
close();
- logger.info( "[#{}] Query failed: ", instanceId, ex );
+ logger.info("[#{}] Query failed: ", instanceId, ex);
}
@Override
public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
lastReceivedBatchNumber++;
- logger.debug( "[#{}] Received query data batch #{}: {}.",
- instanceId, lastReceivedBatchNumber, result );
+ logger.debug("[#{}] Received query data batch #{}: {}.",
+ instanceId, lastReceivedBatchNumber, result);
// If we're in a closed state, just release the message.
if (closed) {
@@ -197,10 +196,10 @@ public class DrillCursor implements Cursor {
batchQueue.add(result);
// Throttle server if queue size has exceed threshold.
- if (batchQueue.size() > batchQueueThrottlingThreshold ) {
- if ( startThrottlingIfNot( throttle ) ) {
- logger.debug( "[#{}] Throttling started at queue size {}.",
- instanceId, batchQueue.size() );
+ if (batchQueue.size() > batchQueueThrottlingThreshold) {
+ if (startThrottlingIfNot(throttle)) {
+ logger.debug("[#{}] Throttling started at queue size {}.",
+ instanceId, batchQueue.size());
}
}
@@ -209,7 +208,7 @@ public class DrillCursor implements Cursor {
@Override
public void queryCompleted(QueryState state) {
- logger.debug( "[#{}] Received query completion: {}.", instanceId, state );
+ logger.debug("[#{}] Received query completion: {}.", instanceId, state);
releaseIfFirst();
completed = true;
}
@@ -218,7 +217,6 @@ public class DrillCursor implements Cursor {
return queryId;
}
-
/**
* Gets the next batch of query results from the queue.
* @return the next batch, or {@code null} after last batch has been returned
@@ -230,8 +228,8 @@ public class DrillCursor implements Cursor {
QueryDataBatch getNext() throws UserException, InterruptedException, SQLTimeoutException {
while (true) {
if (executionFailureException != null) {
- logger.debug( "[#{}] Dequeued query failure exception: {}.",
- instanceId, executionFailureException );
+ logger.debug("[#{}] Dequeued query failure exception: {}.",
+ instanceId, executionFailureException);
throw executionFailureException;
}
if (completed && batchQueue.isEmpty()) {
@@ -240,23 +238,23 @@ public class DrillCursor implements Cursor {
QueryDataBatch qdb = batchQueue.poll(50, TimeUnit.MILLISECONDS);
if (qdb != null) {
lastDequeuedBatchNumber++;
- logger.debug( "[#{}] Dequeued query data batch #{}: {}.",
- instanceId, lastDequeuedBatchNumber, qdb );
+ logger.debug("[#{}] Dequeued query data batch #{}: {}.",
+ instanceId, lastDequeuedBatchNumber, qdb);
// Unthrottle server if queue size has dropped enough below threshold:
- if ( batchQueue.size() < batchQueueThrottlingThreshold / 2
+ if (batchQueue.size() < batchQueueThrottlingThreshold / 2
|| batchQueue.size() == 0 // (in case threshold < 2)
- ) {
- if ( stopThrottlingIfSo() ) {
- logger.debug( "[#{}] Throttling stopped at queue size {}.",
- instanceId, batchQueue.size() );
+ ) {
+ if (stopThrottlingIfSo()) {
+ logger.debug("[#{}] Throttling stopped at queue size {}.",
+ instanceId, batchQueue.size());
}
}
return qdb;
}
// Check and throw SQLTimeoutException
- if ( parent.timeoutInMilliseconds > 0 && parent.elapsedTimer.elapsed(TimeUnit.MILLISECONDS) >= parent.timeoutInMilliseconds ) {
+ if (parent.timeoutInMilliseconds > 0 && parent.elapsedTimer.elapsed(TimeUnit.MILLISECONDS) >= parent.timeoutInMilliseconds) {
throw new SqlTimeoutException(TimeUnit.MILLISECONDS.toSeconds(parent.timeoutInMilliseconds));
}
}
@@ -264,11 +262,11 @@ public class DrillCursor implements Cursor {
}
void close() {
- logger.debug( "[#{}] Query listener closing.", instanceId );
+ logger.debug("[#{}] Query listener closing.", instanceId);
closed = true;
- if ( stopThrottlingIfSo() ) {
- logger.debug( "[#{}] Throttling stopped at close() (at queue size {}).",
- instanceId, batchQueue.size() );
+ if (stopThrottlingIfSo()) {
+ logger.debug("[#{}] Throttling stopped at close() (at queue size {}).",
+ instanceId, batchQueue.size());
}
while (!batchQueue.isEmpty()) {
// Don't bother with query timeout, we're closing the cursor
@@ -283,10 +281,9 @@ public class DrillCursor implements Cursor {
firstMessageReceived.countDown(); // TODO: Why not call releaseIfFirst as used elsewhere?
completed = true;
}
-
}
- private static final Logger logger = getLogger( DrillCursor.class );
+ private static final Logger logger = getLogger(DrillCursor.class);
/** JDBC-specified string for unknown catalog, schema, and table names. */
private static final String UNKNOWN_NAME_STRING = "";
@@ -310,10 +307,10 @@ public class DrillCursor implements Cursor {
private DrillColumnMetaDataList columnMetaDataList;
/** Whether loadInitialSchema() has been called. */
- private boolean initialSchemaLoaded = false;
+ private boolean initialSchemaLoaded;
/** Whether after first batch. (Re skipping spurious empty batches.) */
- private boolean afterFirstBatch = false;
+ private boolean afterFirstBatch;
/**
* Whether the next call to {@code this.}{@link #next()} should just return
@@ -329,10 +326,10 @@ public class DrillCursor implements Cursor {
* and schema before {@code Statement.execute...(...)} even returns.)
* </p>
*/
- private boolean returnTrueForNextCallToNext = false;
+ private boolean returnTrueForNextCallToNext;
/** Whether cursor is after the end of the sequence of records/rows. */
- private boolean afterLastRow = false;
+ private boolean afterLastRow;
private int currentRowNumber = -1;
/** Zero-based offset of current record in record batch.
@@ -340,7 +337,7 @@ public class DrillCursor implements Cursor {
private int currentRecordNumber = -1;
//Track timeout period
- private long timeoutInMilliseconds = 0L;
+ private long timeoutInMilliseconds;
private Stopwatch elapsedTimer;
/**
@@ -357,7 +354,7 @@ public class DrillCursor implements Cursor {
DrillClient client = connection.getClient();
final int batchQueueThrottlingThreshold =
client.getConfig().getInt(
- ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD );
+ ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD);
resultsListener = new ResultsListener(this, batchQueueThrottlingThreshold);
currentBatchHolder = new RecordBatchLoader(client.getAllocator());
@@ -429,10 +426,10 @@ public class DrillCursor implements Cursor {
final List<Class<?>> getObjectClasses = new ArrayList<>();
// (Can't use modern for loop because, for some incompletely clear reason,
// DrillAccessorList blocks iterator() (throwing exception).)
- for ( int ax = 0; ax < accessors.size(); ax++ ) {
+ for (int ax = 0; ax < accessors.size(); ax++) {
final AvaticaDrillSqlAccessor accessor =
- accessors.get( ax );
- getObjectClasses.add( accessor.getObjectClass() );
+ accessors.get(ax);
+ getObjectClasses.add(accessor.getObjectClass());
}
// Update metadata for result set.
@@ -441,7 +438,7 @@ public class DrillCursor implements Cursor {
UNKNOWN_NAME_STRING, // schema name
UNKNOWN_NAME_STRING, // table name
schema,
- getObjectClasses );
+ getObjectClasses);
if (changeListener != null) {
changeListener.schemaChanged(schema);
@@ -478,7 +475,7 @@ public class DrillCursor implements Cursor {
while (qrb != null
&& (qrb.getHeader().getRowCount() == 0 && qrb.getData() == null)) {
// Empty message--dispose of and try to get another.
- logger.warn( "Spurious batch read: {}", qrb );
+ logger.warn("Spurious batch read: {}", qrb);
qrb.release();
@@ -528,29 +525,22 @@ public class DrillCursor implements Cursor {
return true;
}
}
- catch ( UserException e ) {
+ catch (UserException e) {
// A normally expected case--for any server-side error (e.g., syntax
// error in SQL statement).
// Construct SQLException with message text from the UserException.
// TODO: Map UserException error type to SQLException subclass (once
- // error type is accessible, of course. :-( )
- throw new SQLException( e.getMessage(), e );
+ // error type is accessible, of course. :-()
+ throw new SQLException(e.getMessage(), e);
}
- catch ( InterruptedException e ) {
+ catch (InterruptedException e) {
// Not normally expected--Drill doesn't interrupt in this area (right?)--
// but JDBC client certainly could.
- throw new SQLException( "Interrupted.", e );
- }
- catch ( SchemaChangeException e ) {
- // TODO: Clean: DRILL-2933: RecordBatchLoader.load(...) no longer
- // throws SchemaChangeException, so check/clean catch clause.
- throw new SQLException(
- "Unexpected SchemaChangeException from RecordBatchLoader.load(...)" );
+ throw new SQLException("Interrupted.", e);
}
- catch ( RuntimeException e ) {
- throw new SQLException( "Unexpected RuntimeException: " + e.toString(), e );
+ catch (RuntimeException e) {
+ throw new SQLException("Unexpected RuntimeException: " + e.toString(), e);
}
-
}
}
@@ -562,9 +552,9 @@ public class DrillCursor implements Cursor {
* <p>
*/
void loadInitialSchema() throws SQLException {
- if ( initialSchemaLoaded ) {
+ if (initialSchemaLoaded) {
throw new IllegalStateException(
- "loadInitialSchema() called a second time" );
+ "loadInitialSchema() called a second time");
}
assert ! afterLastRow : "afterLastRow already true in loadInitialSchema()";
@@ -593,7 +583,7 @@ public class DrillCursor implements Cursor {
try {
resultsListener.awaitFirstMessage();
- } catch ( InterruptedException e ) {
+ } catch (InterruptedException e) {
// Preserve evidence that the interruption occurred so that code higher up
// on the call stack can learn of the interruption and respond to it if it
// wants to.
@@ -601,7 +591,7 @@ public class DrillCursor implements Cursor {
// Not normally expected--Drill doesn't interrupt in this area (right?)--
// but JDBC client certainly could.
- throw new SQLException("Interrupted", e );
+ throw new SQLException("Interrupted", e);
}
returnTrueForNextCallToNext = true;
@@ -620,17 +610,17 @@ public class DrillCursor implements Cursor {
*/
@Override
public boolean next() throws SQLException {
- if ( ! initialSchemaLoaded ) {
+ if (! initialSchemaLoaded) {
throw new IllegalStateException(
- "next() called but loadInitialSchema() was not called" );
+ "next() called but loadInitialSchema() was not called");
}
assert afterFirstBatch : "afterFirstBatch still false in next()";
- if ( afterLastRow ) {
+ if (afterLastRow) {
// We're already after end of rows/records--just report that after end.
return false;
}
- else if ( returnTrueForNextCallToNext ) {
+ else if (returnTrueForNextCallToNext) {
++currentRowNumber;
// We have a deferred "not after end" to report--reset and report that.
returnTrueForNextCallToNext = false;
@@ -666,5 +656,4 @@ public class DrillCursor implements Cursor {
public Stopwatch getElapsedTimer() {
return elapsedTimer;
}
-
}