You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2019/12/04 12:55:59 UTC
[drill] 05/11: DRILL-7450: Improve performance for ANALYZE command
This is an automated email from the ASF dual-hosted git repository.
volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 20293b63c0bb559ae35d57f7cb1ab7fa24e9ee6d
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Fri Nov 22 19:53:08 2019 +0200
DRILL-7450: Improve performance for ANALYZE command
- Implement two-phase aggregation for the lowest metadata aggregate to optimize performance
- Allow using complex functions with hash aggregate
- Use hash aggregation for PHASE_1of2 for ANALYZE to reduce memory usage and avoid sorting non-aggregated data
- Add sort above hash aggregation to fix correctness of merge exchange and stream aggregate
closes #1907
---
docs/dev/MetastoreAnalyze.md | 61 ++++-
.../org/apache/drill/exec/expr/IsPredicate.java | 2 +-
.../drill/exec/expr/fn/DrillAggFuncHolder.java | 4 +-
.../expr/fn/DrillComplexWriterAggFuncHolder.java | 40 ++-
.../apache/drill/exec/expr/fn/DrillFuncHolder.java | 78 ++++--
.../drill/exec/metastore/ColumnNamesOptions.java | 80 ++++++
.../metastore/analyze/AnalyzeFileInfoProvider.java | 24 +-
.../metastore/analyze/AnalyzeInfoProvider.java | 20 +-
.../analyze/AnalyzeParquetInfoProvider.java | 18 +-
.../analyze/FileMetadataInfoCollector.java | 3 +-
.../analyze/MetadataAggregateContext.java | 24 ++
.../base/AbstractGroupScanWithMetadata.java | 36 ++-
.../exec/physical/config/HashToMergeExchange.java | 3 +-
...MetadataAggPOP.java => MetadataHashAggPOP.java} | 26 +-
...tadataAggPOP.java => MetadataStreamAggPOP.java} | 19 +-
.../exec/physical/impl/aggregate/HashAggBatch.java | 136 ++++++---
.../physical/impl/aggregate/HashAggTemplate.java | 15 +-
.../physical/impl/aggregate/StreamingAggBatch.java | 11 +-
.../physical/impl/flatten/FlattenRecordBatch.java | 2 +-
...aAggBatch.java => MetadataAggregateHelper.java} | 173 +++++++-----
.../impl/metadata/MetadataControllerBatch.java | 44 +--
.../impl/metadata/MetadataHandlerBatch.java | 49 ++--
.../impl/metadata/MetadataHashAggBatch.java | 56 ++++
...eator.java => MetadataHashAggBatchCreator.java} | 8 +-
.../impl/metadata/MetadataStreamAggBatch.java | 62 +++++
...tor.java => MetadataStreamAggBatchCreator.java} | 8 +-
.../physical/impl/project/ProjectRecordBatch.java | 4 +-
.../physical/impl/validate/BatchValidator.java | 8 +
.../resultSet/model/single/BaseReaderBuilder.java | 2 +-
.../exec/physical/rowSet/RowSetFormatter.java | 13 +-
.../apache/drill/exec/planner/PlannerPhase.java | 2 +
.../logical/ConvertCountToDirectScanRule.java | 2 +-
.../ConvertMetadataAggregateToDirectScanRule.java | 271 ++++++++++++++++++
.../planner/physical/DrillDistributionTrait.java | 81 +++++-
.../drill/exec/planner/physical/HashAggPrule.java | 14 +-
.../drill/exec/planner/physical/HashPrelUtil.java | 37 +--
.../exec/planner/physical/MetadataAggPrule.java | 202 ++++++++++++--
.../planner/physical/MetadataHandlerPrule.java | 2 +-
...tadataAggPrel.java => MetadataHashAggPrel.java} | 22 +-
...dataAggPrel.java => MetadataStreamAggPrel.java} | 21 +-
.../drill/exec/planner/physical/PrelUtil.java | 27 +-
.../sql/handlers/MetastoreAnalyzeTableHandler.java | 42 +--
.../apache/drill/exec/store/ColumnExplorer.java | 25 +-
.../store/parquet/AbstractParquetGroupScan.java | 98 +++++--
.../ParquetFileTableMetadataProviderBuilder.java | 4 +
.../exec/store/pojo/DynamicPojoRecordReader.java | 25 +-
.../drill/exec/fn/impl/TestAggregateFunction.java | 2 +-
.../drill/exec/fn/impl/TestAggregateFunctions.java | 206 ++++++++++----
.../physical/impl/agg/TestAggWithAnyValue.java | 304 ++++++++++++++++-----
.../physical/impl/agg/TestHashAggEmitOutcome.java | 205 +++++++-------
.../drill/exec/sql/TestMetastoreCommands.java | 285 +++++++++++++++----
.../test/resources/functions/test_covariance.json | 6 +-
.../resources/functions/test_logical_aggr.json | 6 +-
.../src/main/codegen/templates/ComplexWriters.java | 6 +-
.../expression/FunctionHolderExpression.java | 2 +-
.../common/logical/data/MetadataAggregate.java | 2 +-
56 files changed, 2225 insertions(+), 703 deletions(-)
diff --git a/docs/dev/MetastoreAnalyze.md b/docs/dev/MetastoreAnalyze.md
index ec27c50..23b2b32 100644
--- a/docs/dev/MetastoreAnalyze.md
+++ b/docs/dev/MetastoreAnalyze.md
@@ -94,24 +94,63 @@ Analyze command specific operators:
- `MetadataControllerBatch` - responsible for converting obtained metadata, fetching absent metadata from the Metastore
and storing resulting metadata into the Metastore.
-`MetastoreAnalyzeTableHandler` forms plan depending on segments count in the following form:
+`MetastoreAnalyzeTableHandler` forms plan depending on segments count in the following form:
```
-MetadataControllerBatch
+MetadataControllerRel
...
- MetadataHandlerBatch
- MetadataAggBatch(dir0, ...)
- MetadataHandlerBatch
- MetadataAggBatch(dir0, dir1, ...)
- MetadataHandlerBatch
- MetadataAggBatch(dir0, dir1, fqn, ...)
- Scan(DYNAMIC_STAR **, ANY fqn, ...)
+ MetadataHandlerRel
+ MetadataAggRel(dir0, ...)
+ MetadataHandlerRel
+ MetadataAggRel(dir0, dir1, ...)
+ MetadataHandlerRel
+ MetadataAggRel(dir0, dir1, fqn, ...)
+ DrillScanRel(DYNAMIC_STAR **, ANY fqn, ...)
```
-The lowest `MetadataAggBatch` creates required aggregate calls for every (or interesting only) table columns
+For the case when `ANALYZE` uses columns for which statistics is present in parquet metadata,
+`ConvertMetadataAggregateToDirectScanRule` rule will be applied to the
+
+```
+MetadataAggRel(dir0, dir1, fqn, ...)
+ DrillScanRel(DYNAMIC_STAR **, ANY fqn, ...)
+```
+
+plan part and convert it to the `DrillDirectScanRel` populated with row group metadata for the case when `ANALYZE`
+was done for `ROW_GROUP` metadata level.
+For the case when metadata level in `ANALYZE` is not `ROW_GROUP`, the plan above will be converted into the following plan:
+
+```
+MetadataAggRel(metadataLevel=FILE (or another non-ROW_GROUP value), createNewAggregations=false)
+ DrillDirectScanRel
+```
+
+When it is converted into the physical plan, two-phase aggregation may be used for the case when incoming row
+count is greater than `planner.slice_target` option value. In this case, the lowest aggregation will be hash
+aggregation and it will be executed on the same minor fragments where the scan is produced. `Sort` operator will be
+placed above hash aggregation. `HashToMergeExchange` operator above `Sort` will send aggregated sorted data to the
+stream aggregate above.
+
+Example of the resulting plan:
+
+```
+MetadataControllerPrel
+ ...
+ MetadataStreamAggPrel(PHASE_1of1)
+ SortPrel
+ MetadataHandlerPrel
+ MetadataStreamAggPrel(PHASE_2of2)
+ HashToMergeExchangePrel
+ SortPrel
+ MetadataHashAggPrel(PHASE_1of2)
+ ScanPrel
+```
+
+The lowest `MetadataStreamAggBatch` (or `MetadataHashAggBatch` for the case of two-phase aggregation with
+`MetadataStreamAggBatch` above) creates required aggregate calls for every (or interesting only) table columns
and produces aggregations with grouping by segment columns that correspond to specific table level.
`MetadataHandlerBatch` above it populates batch with additional information about metadata type and other info.
-`MetadataAggBatch` above merges metadata calculated before to obtain metadata for parent metadata levels and also stores incoming data to populate it to the Metastore later.
+`MetadataStreamAggBatch` above merges metadata calculated before to obtain metadata for parent metadata levels and also stores incoming data to populate it to the Metastore later.
`MetadataControllerBatch` obtains all calculated metadata, converts it to the suitable form and sends it to the Metastore.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/IsPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/IsPredicate.java
index 21e012e..aa0b9fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/IsPredicate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/IsPredicate.java
@@ -71,7 +71,7 @@ public class IsPredicate<C extends Comparable<C>> extends LogicalExpressionBase
* @param stat statistics object
* @return <tt>true</tt> if the input stat object is null or has invalid statistics; false otherwise
*/
- static boolean isNullOrEmpty(ColumnStatistics stat) {
+ public static boolean isNullOrEmpty(ColumnStatistics stat) {
return stat == null
|| !stat.contains(ColumnStatisticsKind.MIN_VALUE)
|| !stat.contains(ColumnStatisticsKind.MAX_VALUE)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java
index aac0abb..6203cd8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java
@@ -213,9 +213,7 @@ class DrillAggFuncHolder extends DrillFuncHolder {
declareVarArgArray(g.getModel(), sub, inputVariables);
}
for (int i = 0; i < inputVariables.length; i++) {
- ValueReference parameter = getAttributeParameter(i);
- HoldingContainer inputVariable = inputVariables[i];
- declare(sub, parameter, inputVariable.getHolder().type(), inputVariable.getHolder(), i);
+ declare(g.getModel(), sub, inputVariables[i], i);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterAggFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterAggFuncHolder.java
index 8439983..eec5848 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterAggFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterAggFuncHolder.java
@@ -23,6 +23,8 @@ import org.apache.drill.common.expression.FunctionHolderExpression;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
+import org.apache.drill.exec.physical.impl.aggregate.HashAggBatch;
+import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate;
import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
import org.apache.drill.exec.physical.impl.aggregate.StreamingAggTemplate;
import org.apache.drill.exec.record.VectorAccessibleComplexWriter;
@@ -56,10 +58,18 @@ public class DrillComplexWriterAggFuncHolder extends DrillAggFuncHolder {
@Override
public JVar[] renderStart(ClassGenerator<?> classGenerator, HoldingContainer[] inputVariables, FieldReference fieldReference) {
- if (!classGenerator.getMappingSet().isHashAggMapping()) { //Declare workspace vars for non-hash-aggregation.
- JInvocation container = classGenerator.getMappingSet().getOutgoing().invoke("getOutgoingContainer");
+ JInvocation container = classGenerator.getMappingSet().getOutgoing().invoke("getOutgoingContainer");
- complexWriter = classGenerator.declareClassField("complexWriter", classGenerator.getModel()._ref(ComplexWriter.class));
+ complexWriter = classGenerator.declareClassField("complexWriter", classGenerator.getModel()._ref(ComplexWriter.class));
+
+ if (classGenerator.getMappingSet().isHashAggMapping()) {
+ // Default name is "col", if not passed in a reference name for the output vector.
+ String refName = fieldReference == null ? "col" : fieldReference.getRootSegment().getPath();
+ JClass cwClass = classGenerator.getModel().ref(VectorAccessibleComplexWriter.class);
+ classGenerator.getSetupBlock().assign(complexWriter, cwClass.staticInvoke("getWriter").arg(refName).arg(container));
+
+ return super.renderStart(classGenerator, inputVariables, fieldReference);
+ } else { //Declare workspace vars for non-hash-aggregation.
writerIdx = classGenerator.declareClassField("writerIdx", classGenerator.getModel()._ref(int.class));
lastWriterIdx = classGenerator.declareClassField("lastWriterIdx", classGenerator.getModel()._ref(int.class));
//Default name is "col", if not passed in a reference name for the output vector.
@@ -72,8 +82,6 @@ public class DrillComplexWriterAggFuncHolder extends DrillAggFuncHolder {
JVar[] workspaceJVars = declareWorkspaceVariables(classGenerator);
generateBody(classGenerator, ClassGenerator.BlockType.SETUP, setup(), null, workspaceJVars, true);
return workspaceJVars;
- } else {
- return super.renderStart(classGenerator, inputVariables, fieldReference);
}
}
@@ -84,26 +92,33 @@ public class DrillComplexWriterAggFuncHolder extends DrillAggFuncHolder {
getRegisteredNames()[0]));
JBlock sub = new JBlock(true, true);
- JBlock topSub = sub;
JClass aggBatchClass = null;
if (classGenerator.getCodeGenerator().getDefinition() == StreamingAggTemplate.TEMPLATE_DEFINITION) {
aggBatchClass = classGenerator.getModel().ref(StreamingAggBatch.class);
+ } else if (classGenerator.getCodeGenerator().getDefinition() == HashAggTemplate.TEMPLATE_DEFINITION) {
+ aggBatchClass = classGenerator.getModel().ref(HashAggBatch.class);
}
- assert aggBatchClass != null : "ComplexWriterAggFuncHolder should only be used with Streaming Aggregate Operator";
JExpression aggBatch = JExpr.cast(aggBatchClass, classGenerator.getMappingSet().getOutgoing());
classGenerator.getSetupBlock().add(aggBatch.invoke("addComplexWriter").arg(complexWriter));
// Only set the writer if there is a position change. Calling setPosition may cause underlying writers to allocate
// new vectors, thereby, losing the previously stored values
- JBlock condAssignCW = classGenerator.getEvalBlock()._if(lastWriterIdx.ne(writerIdx))._then();
- condAssignCW.add(complexWriter.invoke("setPosition").arg(writerIdx));
- condAssignCW.assign(lastWriterIdx, writerIdx);
+ if (classGenerator.getMappingSet().isHashAggMapping()) {
+ classGenerator.getEvalBlock().add(
+ complexWriter
+ .invoke("setPosition")
+ .arg(classGenerator.getMappingSet().getWorkspaceIndex()));
+ } else {
+ JBlock condAssignCW = classGenerator.getEvalBlock()._if(lastWriterIdx.ne(writerIdx))._then();
+ condAssignCW.add(complexWriter.invoke("setPosition").arg(writerIdx));
+ condAssignCW.assign(lastWriterIdx, writerIdx);
+ }
sub.decl(classGenerator.getModel()._ref(ComplexWriter.class), getReturnValue().getName(), complexWriter);
// add the subblock after the out declaration.
- classGenerator.getEvalBlock().add(topSub);
+ classGenerator.getEvalBlock().add(sub);
addProtectedBlock(classGenerator, sub, add(), inputVariables, workspaceJVars, false);
classGenerator.getEvalBlock().directStatement(String.format("//---- end of eval portion of %s function. ----//",
@@ -124,7 +139,8 @@ public class DrillComplexWriterAggFuncHolder extends DrillAggFuncHolder {
JExpr._new(classGenerator.getHolderType(getReturnType())));
}
classGenerator.getEvalBlock().add(sub);
- if (getReturnType().getMinorType() == TypeProtos.MinorType.LATE) {
+ if (getReturnType().getMinorType() == TypeProtos.MinorType.LATE
+ && !classGenerator.getMappingSet().isHashAggMapping()) {
sub.assignPlus(writerIdx, JExpr.lit(1));
}
addProtectedBlock(classGenerator, sub, output(), null, workspaceJVars, false);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
index 4d758f8..91f7cea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
@@ -218,43 +218,21 @@ public abstract class DrillFuncHolder extends AbstractFuncHolder {
if (decConstInputOnly && !inputVariables[i].isConstant()) {
continue;
}
-
- ValueReference parameter = getAttributeParameter(i);
- HoldingContainer inputVariable = inputVariables[i];
- if (parameter.isFieldReader() && ! inputVariable.isReader()
- && ! Types.isComplex(inputVariable.getMajorType()) && inputVariable.getMinorType() != MinorType.UNION) {
- JType singularReaderClass = g.getModel()._ref(TypeHelper.getHolderReaderImpl(inputVariable.getMajorType().getMinorType(),
- inputVariable.getMajorType().getMode()));
- JType fieldReadClass = getParamClass(g.getModel(), parameter, inputVariable.getHolder().type());
- JInvocation reader = JExpr._new(singularReaderClass).arg(inputVariable.getHolder());
- declare(sub, parameter, fieldReadClass, reader, i);
- } else if (!parameter.isFieldReader() && inputVariable.isReader() && Types.isComplex(parameter.getType())) {
- // For complex data-types (repeated maps/lists/dicts) the input to the aggregate will be a FieldReader. However, aggregate
- // functions like ANY_VALUE, will assume the input to be a RepeatedMapHolder etc. Generate boilerplate code, to map
- // from FieldReader to respective Holder.
- if (Types.isComplex(parameter.getType())) {
- JType holderClass = getParamClass(g.getModel(), parameter, inputVariable.getHolder().type());
- JAssignmentTarget holderVar = declare(sub, parameter, holderClass, JExpr._new(holderClass), i);
- sub.assign(holderVar.ref("reader"), inputVariable.getHolder());
- }
- } else {
- JExpression exprToAssign = inputVariable.getHolder();
- if (parameter.isVarArg() && parameter.isFieldReader() && Types.isUnion(inputVariable.getMajorType())) {
- exprToAssign = exprToAssign.ref("reader");
- }
- declare(sub, parameter, inputVariable.getHolder().type(), exprToAssign, i);
- }
+ declare(g.getModel(), sub, inputVariables[i], i);
}
}
JVar[] internalVars = new JVar[workspaceJVars.length];
for (int i = 0; i < workspaceJVars.length; i++) {
if (decConstInputOnly) {
- internalVars[i] = sub.decl(g.getModel()._ref(attributes.getWorkspaceVars()[i].getType()), attributes.getWorkspaceVars()[i].getName(), workspaceJVars[i]);
+ internalVars[i] = sub.decl(
+ g.getModel()._ref(attributes.getWorkspaceVars()[i].getType()),
+ attributes.getWorkspaceVars()[i].getName(), workspaceJVars[i]);
} else {
- internalVars[i] = sub.decl(g.getModel()._ref(attributes.getWorkspaceVars()[i].getType()), attributes.getWorkspaceVars()[i].getName(), workspaceJVars[i]);
+ internalVars[i] = sub.decl(
+ g.getModel()._ref(attributes.getWorkspaceVars()[i].getType()),
+ attributes.getWorkspaceVars()[i].getName(), workspaceJVars[i]);
}
-
}
Preconditions.checkNotNull(body);
@@ -267,6 +245,48 @@ public abstract class DrillFuncHolder extends AbstractFuncHolder {
}
/**
+ * Declares attribute parameter which corresponds to specified {@code currentIndex}
+ * in specified {@code jBlock} considering its type.
+ *
+ * @param model code model to generate the code
+ * @param jBlock block of code to be populated
+ * @param inputVariable input variable for current function
+ * @param currentIndex index of current parameter
+ */
+ protected void declare(JCodeModel model, JBlock jBlock,
+ HoldingContainer inputVariable, int currentIndex) {
+ ValueReference parameter = getAttributeParameter(currentIndex);
+ if (parameter.isFieldReader()
+ && !inputVariable.isReader()
+ && !Types.isComplex(inputVariable.getMajorType())
+ && inputVariable.getMinorType() != MinorType.UNION) {
+ JType singularReaderClass = model._ref(
+ TypeHelper.getHolderReaderImpl(inputVariable.getMajorType().getMinorType(),
+ inputVariable.getMajorType().getMode()));
+ JType fieldReadClass = getParamClass(model, parameter, inputVariable.getHolder().type());
+ JInvocation reader = JExpr._new(singularReaderClass).arg(inputVariable.getHolder());
+ declare(jBlock, parameter, fieldReadClass, reader, currentIndex);
+ } else if (!parameter.isFieldReader()
+ && inputVariable.isReader()
+ && Types.isComplex(parameter.getType())) {
+ // For complex data-types (repeated maps/lists/dicts) the input to the aggregate will be a FieldReader. However, aggregate
+ // functions like ANY_VALUE, will assume the input to be a RepeatedMapHolder etc. Generate boilerplate code, to map
+ // from FieldReader to respective Holder.
+ if (Types.isComplex(parameter.getType())) {
+ JType holderClass = getParamClass(model, parameter, inputVariable.getHolder().type());
+ JAssignmentTarget holderVar = declare(jBlock, parameter, holderClass, JExpr._new(holderClass), currentIndex);
+ jBlock.assign(holderVar.ref("reader"), inputVariable.getHolder());
+ }
+ } else {
+ JExpression exprToAssign = inputVariable.getHolder();
+ if (parameter.isVarArg() && parameter.isFieldReader() && Types.isUnion(inputVariable.getMajorType())) {
+ exprToAssign = exprToAssign.ref("reader");
+ }
+ declare(jBlock, parameter, inputVariable.getHolder().type(), exprToAssign, currentIndex);
+ }
+ }
+
+ /**
* Declares array for storing vararg function arguments.
*
* @param model code model to generate the code
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/ColumnNamesOptions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/ColumnNamesOptions.java
new file mode 100644
index 0000000..0b9faca
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/ColumnNamesOptions.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.metastore;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.options.OptionManager;
+
+import java.util.StringJoiner;
+
+/**
+ * Holds system / session options that are used for obtaining partition / implicit / special column names.
+ */
+public class ColumnNamesOptions {
+ private final String fullyQualifiedName;
+ private final String partitionColumnNameLabel;
+ private final String rowGroupIndex;
+ private final String rowGroupStart;
+ private final String rowGroupLength;
+ private final String lastModifiedTime;
+
+ public ColumnNamesOptions(OptionManager optionManager) {
+ this.fullyQualifiedName = optionManager.getOption(ExecConstants.IMPLICIT_FQN_COLUMN_LABEL).string_val;
+ this.partitionColumnNameLabel = optionManager.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
+ this.rowGroupIndex = optionManager.getOption(ExecConstants.IMPLICIT_ROW_GROUP_INDEX_COLUMN_LABEL).string_val;
+ this.rowGroupStart = optionManager.getOption(ExecConstants.IMPLICIT_ROW_GROUP_START_COLUMN_LABEL).string_val;
+ this.rowGroupLength = optionManager.getOption(ExecConstants.IMPLICIT_ROW_GROUP_LENGTH_COLUMN_LABEL).string_val;
+ this.lastModifiedTime = optionManager.getOption(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL).string_val;
+ }
+
+ public String partitionColumnNameLabel() {
+ return partitionColumnNameLabel;
+ }
+
+ public String fullyQualifiedName() {
+ return fullyQualifiedName;
+ }
+
+ public String rowGroupIndex() {
+ return rowGroupIndex;
+ }
+
+ public String rowGroupStart() {
+ return rowGroupStart;
+ }
+
+ public String rowGroupLength() {
+ return rowGroupLength;
+ }
+
+ public String lastModifiedTime() {
+ return lastModifiedTime;
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ", ColumnNamesOptions.class.getSimpleName() + "[", "]")
+ .add("fullyQualifiedName='" + fullyQualifiedName + "'")
+ .add("partitionColumnNameLabel='" + partitionColumnNameLabel + "'")
+ .add("rowGroupIndex='" + rowGroupIndex + "'")
+ .add("rowGroupStart='" + rowGroupStart + "'")
+ .add("rowGroupLength='" + rowGroupLength + "'")
+ .add("lastModifiedTime='" + lastModifiedTime + "'")
+ .toString();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeFileInfoProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeFileInfoProvider.java
index 0371cc2..a4bf0ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeFileInfoProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeFileInfoProvider.java
@@ -18,17 +18,14 @@
package org.apache.drill.exec.metastore.analyze;
import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FormatSelection;
@@ -37,7 +34,7 @@ import org.apache.drill.metastore.metadata.MetadataType;
import org.apache.drill.metastore.metadata.TableInfo;
import java.io.IOException;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
@@ -49,7 +46,7 @@ import java.util.stream.Collectors;
public abstract class AnalyzeFileInfoProvider implements AnalyzeInfoProvider {
@Override
- public List<SchemaPath> getSegmentColumns(DrillTable table, OptionManager options) throws IOException {
+ public List<SchemaPath> getSegmentColumns(DrillTable table, ColumnNamesOptions columnNamesOptions) throws IOException {
FormatSelection selection = (FormatSelection) table.getSelection();
FileSelection fileSelection = selection.getSelection();
@@ -57,16 +54,17 @@ public abstract class AnalyzeFileInfoProvider implements AnalyzeInfoProvider {
fileSelection = FileMetadataInfoCollector.getExpandedFileSelection(fileSelection);
}
- return ColumnExplorer.getPartitionColumnNames(fileSelection, options).stream()
+ return ColumnExplorer.getPartitionColumnNames(fileSelection, columnNamesOptions).stream()
.map(SchemaPath::getSimplePath)
.collect(Collectors.toList());
}
@Override
- public List<SqlIdentifier> getProjectionFields(MetadataType metadataLevel, OptionManager options) {
- return Arrays.asList(
- new SqlIdentifier(options.getString(ExecConstants.IMPLICIT_FQN_COLUMN_LABEL), SqlParserPos.ZERO),
- new SqlIdentifier(options.getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL), SqlParserPos.ZERO));
+ public List<SchemaPath> getProjectionFields(DrillTable table, MetadataType metadataLevel, ColumnNamesOptions columnNamesOptions) throws IOException {
+ List<SchemaPath> projectionList = new ArrayList<>(getSegmentColumns(table, columnNamesOptions));
+ projectionList.add(SchemaPath.getSimplePath(columnNamesOptions.fullyQualifiedName()));
+ projectionList.add(SchemaPath.getSimplePath(columnNamesOptions.lastModifiedTime()));
+ return Collections.unmodifiableList(projectionList);
}
@Override
@@ -78,8 +76,8 @@ public abstract class AnalyzeFileInfoProvider implements AnalyzeInfoProvider {
}
@Override
- public SchemaPath getLocationField(OptionManager optionManager) {
- return SchemaPath.getSimplePath(optionManager.getString(ExecConstants.IMPLICIT_FQN_COLUMN_LABEL));
+ public SchemaPath getLocationField(ColumnNamesOptions columnNamesOptions) {
+ return SchemaPath.getSimplePath(columnNamesOptions.fullyQualifiedName());
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeInfoProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeInfoProvider.java
index 49b8430..88965c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeInfoProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeInfoProvider.java
@@ -18,13 +18,12 @@
package org.apache.drill.exec.metastore.analyze;
import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.sql.SqlIdentifier;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.drill.metastore.components.tables.BasicTablesRequests;
import org.apache.drill.metastore.metadata.MetadataType;
@@ -42,20 +41,21 @@ public interface AnalyzeInfoProvider {
/**
* Returns list of segment column names for specified {@link DrillTable} table.
*
- * @param table table for which should be returned segment column names
- * @param options option manager
+ * @param table table for which should be returned segment column names
+ * @param columnNamesOptions column names option values
* @return list of segment column names
*/
- List<SchemaPath> getSegmentColumns(DrillTable table, OptionManager options) throws IOException;
+ List<SchemaPath> getSegmentColumns(DrillTable table, ColumnNamesOptions columnNamesOptions) throws IOException;
/**
* Returns list of fields required for ANALYZE.
*
- * @param metadataLevel metadata level for analyze
- * @param options option manager
+ * @param table drill table
+ * @param metadataLevel metadata level for analyze
+ * @param columnNamesOptions column names option values
* @return list of fields required for ANALYZE
*/
- List<SqlIdentifier> getProjectionFields(MetadataType metadataLevel, OptionManager options);
+ List<SchemaPath> getProjectionFields(DrillTable table, MetadataType metadataLevel, ColumnNamesOptions columnNamesOptions) throws IOException;
/**
* Returns {@link MetadataInfoCollector} instance for obtaining information about segments, files, etc.
@@ -79,10 +79,10 @@ public interface AnalyzeInfoProvider {
* Provides schema path to field which will be used as a location for specific table data,
* for example, for file-based tables, it may be `fqn`.
*
- * @param optionManager option manager
+ * @param columnNamesOptions column names option values
* @return location field
*/
- SchemaPath getLocationField(OptionManager optionManager);
+ SchemaPath getLocationField(ColumnNamesOptions columnNamesOptions);
/**
* Returns expression which may be used to determine parent location for specific table data,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeParquetInfoProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeParquetInfoProvider.java
index 4ce1424..f48e9cc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeParquetInfoProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeParquetInfoProvider.java
@@ -17,14 +17,14 @@
*/
package org.apache.drill.exec.metastore.analyze;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.store.parquet.ParquetGroupScan;
import org.apache.drill.metastore.metadata.MetadataType;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -38,12 +38,12 @@ public class AnalyzeParquetInfoProvider extends AnalyzeFileInfoProvider {
public static final String TABLE_TYPE_NAME = "PARQUET";
@Override
- public List<SqlIdentifier> getProjectionFields(MetadataType metadataLevel, OptionManager options) {
- List<SqlIdentifier> columnList = new ArrayList<>(super.getProjectionFields(metadataLevel, options));
+ public List<SchemaPath> getProjectionFields(DrillTable table, MetadataType metadataLevel, ColumnNamesOptions columnNamesOptions) throws IOException {
+ List<SchemaPath> columnList = new ArrayList<>(super.getProjectionFields(table, metadataLevel, columnNamesOptions));
if (metadataLevel.includes(MetadataType.ROW_GROUP)) {
- columnList.add(new SqlIdentifier(options.getString(ExecConstants.IMPLICIT_ROW_GROUP_INDEX_COLUMN_LABEL), SqlParserPos.ZERO));
- columnList.add(new SqlIdentifier(options.getString(ExecConstants.IMPLICIT_ROW_GROUP_START_COLUMN_LABEL), SqlParserPos.ZERO));
- columnList.add(new SqlIdentifier(options.getString(ExecConstants.IMPLICIT_ROW_GROUP_LENGTH_COLUMN_LABEL), SqlParserPos.ZERO));
+ columnList.add(SchemaPath.getSimplePath(columnNamesOptions.rowGroupIndex()));
+ columnList.add(SchemaPath.getSimplePath(columnNamesOptions.rowGroupStart()));
+ columnList.add(SchemaPath.getSimplePath(columnNamesOptions.rowGroupLength()));
}
return Collections.unmodifiableList(columnList);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/FileMetadataInfoCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/FileMetadataInfoCollector.java
index e9882d1..d9765d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/FileMetadataInfoCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/FileMetadataInfoCollector.java
@@ -154,7 +154,8 @@ public class FileMetadataInfoCollector implements MetadataInfoCollector {
String selectionRoot = selection.getSelection().getSelectionRoot().toUri().getPath();
if (!Objects.equals(metastoreInterestingColumns, interestingColumns)
- && (metastoreInterestingColumns == null || !metastoreInterestingColumns.containsAll(interestingColumns))
+ && metastoreInterestingColumns != null &&
+ (interestingColumns == null || !metastoreInterestingColumns.containsAll(interestingColumns))
|| TableStatisticsKind.ANALYZE_METADATA_LEVEL.getValue(basicRequests.tableMetadata(tableInfo)).compareTo(metadataLevel) != 0) {
// do not update table scan and lists of segments / files / row groups,
// metadata should be recalculated
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/MetadataAggregateContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/MetadataAggregateContext.java
index 46f6df5..9108345 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/MetadataAggregateContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/MetadataAggregateContext.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.metastore.metadata.MetadataType;
import java.util.List;
import java.util.Objects;
@@ -36,12 +37,14 @@ public class MetadataAggregateContext {
private final List<SchemaPath> interestingColumns;
private final List<SchemaPath> excludedColumns;
private final boolean createNewAggregations;
+ private final MetadataType metadataLevel;
public MetadataAggregateContext(MetadataAggregateContextBuilder builder) {
this.groupByExpressions = builder.groupByExpressions;
this.interestingColumns = builder.interestingColumns;
this.createNewAggregations = builder.createNewAggregations;
this.excludedColumns = builder.excludedColumns;
+ this.metadataLevel = builder.metadataLevel;
}
@JsonProperty
@@ -64,6 +67,11 @@ public class MetadataAggregateContext {
return excludedColumns;
}
+ @JsonProperty
+ public MetadataType metadataLevel() {
+ return metadataLevel;
+ }
+
@Override
public String toString() {
return new StringJoiner(",\n", MetadataAggregateContext.class.getSimpleName() + "[", "]")
@@ -78,11 +86,21 @@ public class MetadataAggregateContext {
return new MetadataAggregateContextBuilder();
}
+ public MetadataAggregateContextBuilder toBuilder() {
+ return new MetadataAggregateContextBuilder()
+ .groupByExpressions(groupByExpressions)
+ .interestingColumns(interestingColumns)
+ .createNewAggregations(createNewAggregations)
+ .excludedColumns(excludedColumns)
+ .metadataLevel(metadataLevel);
+ }
+
@JsonPOJOBuilder(withPrefix = "")
public static class MetadataAggregateContextBuilder {
private List<NamedExpression> groupByExpressions;
private List<SchemaPath> interestingColumns;
private Boolean createNewAggregations;
+ private MetadataType metadataLevel;
private List<SchemaPath> excludedColumns;
public MetadataAggregateContextBuilder groupByExpressions(List<NamedExpression> groupByExpressions) {
@@ -90,6 +108,11 @@ public class MetadataAggregateContext {
return this;
}
+ public MetadataAggregateContextBuilder metadataLevel(MetadataType metadataLevel) {
+ this.metadataLevel = metadataLevel;
+ return this;
+ }
+
public MetadataAggregateContextBuilder interestingColumns(List<SchemaPath> interestingColumns) {
this.interestingColumns = interestingColumns;
return this;
@@ -109,6 +132,7 @@ public class MetadataAggregateContext {
Objects.requireNonNull(groupByExpressions, "groupByExpressions were not set");
Objects.requireNonNull(createNewAggregations, "createNewAggregations was not set");
Objects.requireNonNull(excludedColumns, "excludedColumns were not set");
+ Objects.requireNonNull(metadataLevel, "metadataLevel was not set");
return new MetadataAggregateContext(this);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
index de13ee5..2371c6d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
@@ -85,9 +85,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
/**
* Represents table group scan with metadata usage.
*/
-public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupScan {
+public abstract class AbstractGroupScanWithMetadata<P extends TableMetadataProvider> extends AbstractFileGroupScan {
- protected TableMetadataProvider metadataProvider;
+ protected P metadataProvider;
// table metadata info
protected TableMetadata tableMetadata;
@@ -118,7 +118,7 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
this.filter = filter;
}
- protected AbstractGroupScanWithMetadata(AbstractGroupScanWithMetadata that) {
+ protected AbstractGroupScanWithMetadata(AbstractGroupScanWithMetadata<P> that) {
super(that.getUserName());
this.columns = that.columns;
this.filter = that.filter;
@@ -215,7 +215,7 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
}
@Override
- public TableMetadataProvider getMetadataProvider() {
+ public P getMetadataProvider() {
return metadataProvider;
}
@@ -516,7 +516,13 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
// partition pruning methods start
@Override
public List<SchemaPath> getPartitionColumns() {
- return partitionColumns != null ? partitionColumns : new ArrayList<>();
+ if (partitionColumns == null) {
+ partitionColumns = metadataProvider.getPartitionColumns();
+ if (partitionColumns == null) {
+ partitionColumns = new ArrayList<>();
+ }
+ }
+ return partitionColumns;
}
@JsonIgnore
@@ -567,8 +573,8 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
return ColumnExplorer.isPartitionColumn(optionManager, schemaPath) || implicitColNames.contains(schemaPath.getRootSegmentPath());
}
- // protected methods for internal usage
- protected Map<Path, FileMetadata> getFilesMetadata() {
+ @JsonIgnore
+ public Map<Path, FileMetadata> getFilesMetadata() {
if (files == null) {
files = metadataProvider.getFilesMetadataMap();
}
@@ -583,14 +589,16 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
return tableMetadata;
}
- protected List<PartitionMetadata> getPartitionsMetadata() {
+ @JsonIgnore
+ public List<PartitionMetadata> getPartitionsMetadata() {
if (partitions == null) {
partitions = metadataProvider.getPartitionsMetadata();
}
return partitions;
}
- protected Map<Path, SegmentMetadata> getSegmentsMetadata() {
+ @JsonIgnore
+ public Map<Path, SegmentMetadata> getSegmentsMetadata() {
if (segments == null) {
segments = metadataProvider.getSegmentsMetadataMap();
}
@@ -614,7 +622,7 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
* This class is responsible for filtering different metadata levels.
*/
protected abstract static class GroupScanWithMetadataFilterer<B extends GroupScanWithMetadataFilterer<B>> {
- protected final AbstractGroupScanWithMetadata source;
+ protected final AbstractGroupScanWithMetadata<? extends TableMetadataProvider> source;
protected boolean matchAllMetadata = false;
@@ -952,7 +960,7 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
Iterable<T> metadataList,
FilterPredicate<?> filterPredicate,
OptionManager optionManager) {
- List<T> qualifiedFiles = new ArrayList<>();
+ List<T> qualifiedMetadata = new ArrayList<>();
for (T metadata : metadataList) {
TupleMetadata schema = metadata.getSchema();
@@ -983,12 +991,12 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
if (matchAllMetadata) {
matchAllMetadata = match == RowsMatch.ALL;
}
- qualifiedFiles.add(metadata);
+ qualifiedMetadata.add(metadata);
}
- if (qualifiedFiles.isEmpty()) {
+ if (qualifiedMetadata.isEmpty()) {
matchAllMetadata = false;
}
- return qualifiedFiles;
+ return qualifiedMetadata;
}
protected abstract B self();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
index 592f7c3..0828bea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
@@ -32,8 +32,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("hash-to-merge-exchange")
-public class HashToMergeExchange extends AbstractExchange{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashToMergeExchange.class);
+public class HashToMergeExchange extends AbstractExchange {
private final LogicalExpression distExpr;
private final List<Ordering> orderExprs;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataAggPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataHashAggPOP.java
similarity index 62%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataAggPOP.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataHashAggPOP.java
index f31735c..35990c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataAggPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataHashAggPOP.java
@@ -20,29 +20,41 @@ package org.apache.drill.exec.physical.config;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.metastore.analyze.MetadataAggregateContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import java.util.Collections;
-@JsonTypeName("metadataAggregate")
-public class MetadataAggPOP extends StreamingAggregate {
+@JsonTypeName("metadataHashAggregate")
+public class MetadataHashAggPOP extends HashAggregate {
private final MetadataAggregateContext context;
+ private final OperatorPhase phase;
@JsonCreator
- public MetadataAggPOP(@JsonProperty("child") PhysicalOperator child,
- @JsonProperty("context") MetadataAggregateContext context) {
- super(child, context.groupByExpressions(), Collections.emptyList());
+ public MetadataHashAggPOP(@JsonProperty("child") PhysicalOperator child,
+ @JsonProperty("context") MetadataAggregateContext context,
+ @JsonProperty("phase") OperatorPhase phase) {
+ super(child, phase, context.groupByExpressions(), Collections.emptyList(), 1.0F);
+ Preconditions.checkArgument(context.createNewAggregations(),
+ "Hash aggregate for metadata collecting should be used only for creating new aggregations.");
this.context = context;
+ this.phase = phase;
}
@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
- return new MetadataAggPOP(child, context);
+ return new MetadataHashAggPOP(child, context, phase);
}
@JsonProperty
public MetadataAggregateContext getContext() {
return context;
}
+
+ @JsonProperty
+ public OperatorPhase getPhase() {
+ return phase;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataAggPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataStreamAggPOP.java
similarity index 72%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataAggPOP.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataStreamAggPOP.java
index f31735c..e7220c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataAggPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataStreamAggPOP.java
@@ -22,27 +22,36 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.metastore.analyze.MetadataAggregateContext;
+import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
import java.util.Collections;
-@JsonTypeName("metadataAggregate")
-public class MetadataAggPOP extends StreamingAggregate {
+@JsonTypeName("metadataStreamAggregate")
+public class MetadataStreamAggPOP extends StreamingAggregate {
private final MetadataAggregateContext context;
+ private final OperatorPhase phase;
@JsonCreator
- public MetadataAggPOP(@JsonProperty("child") PhysicalOperator child,
- @JsonProperty("context") MetadataAggregateContext context) {
+ public MetadataStreamAggPOP(@JsonProperty("child") PhysicalOperator child,
+ @JsonProperty("context") MetadataAggregateContext context,
+ @JsonProperty("phase") OperatorPhase phase) {
super(child, context.groupByExpressions(), Collections.emptyList());
this.context = context;
+ this.phase = phase;
}
@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
- return new MetadataAggPOP(child, context);
+ return new MetadataStreamAggPOP(child, context, phase);
}
@JsonProperty
public MetadataAggregateContext getContext() {
return context;
}
+
+ @JsonProperty
+ public OperatorPhase getPhase() {
+ return phase;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index f905687..45c670b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -18,11 +18,20 @@
package org.apache.drill.exec.physical.impl.aggregate;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.DrillFuncHolderExpr;
import org.apache.drill.exec.planner.physical.AggPrelBase;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.UntypedNullHolder;
+import org.apache.drill.exec.vector.UntypedNullVector;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
@@ -69,20 +78,25 @@ import org.apache.drill.exec.vector.ValueVector;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JVar;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggBatch.class);
+ static final Logger logger = LoggerFactory.getLogger(HashAggBatch.class);
private HashAggregator aggregator;
- private RecordBatch incoming;
+ protected RecordBatch incoming;
private LogicalExpression[] aggrExprs;
private TypedFieldId[] groupByOutFieldIds;
private TypedFieldId[] aggrOutFieldIds; // field ids for the outgoing batch
private final List<Comparator> comparators;
private BatchSchema incomingSchema;
private boolean wasKilled;
+ private List<BaseWriter.ComplexWriter> complexWriters;
- private int numGroupByExprs, numAggrExprs;
+ private int numGroupByExprs;
+ private int numAggrExprs;
+ private boolean firstBatch = true;
// This map saves the mapping between outgoing column and incoming column.
private Map<String, String> columnMapping;
@@ -136,13 +150,17 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
valuesRowWidth += ((FixedWidthVector) w.getValueVector()).getValueWidth();
}
} else {
- int columnWidth;
+ int columnWidth = 0;
+ TypeProtos.MajorType type = w.getField().getType();
if (columnMapping.get(w.getValueVector().getField().getName()) == null) {
- columnWidth = TypeHelper.getSize(w.getField().getType());
+ if (!Types.isComplex(type)) {
+ columnWidth = TypeHelper.getSize(type);
+ }
} else {
- RecordBatchSizer.ColumnSize columnSize = getRecordBatchSizer().getColumn(columnMapping.get(w.getValueVector().getField().getName()));
+ RecordBatchSizer.ColumnSize columnSize = getRecordBatchSizer()
+ .getColumn(columnMapping.get(w.getValueVector().getField().getName()));
if (columnSize == null) {
- columnWidth = TypeHelper.getSize(w.getField().getType());
+ columnWidth = TypeHelper.getSize(type);
} else {
columnWidth = columnSize.getAllocSizePerEntry();
}
@@ -214,6 +232,11 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
}
@Override
+ public VectorContainer getOutgoingContainer() {
+ return container;
+ }
+
+ @Override
public int getRecordCount() {
if (state == BatchState.DONE) {
return 0;
@@ -222,7 +245,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
}
@Override
- public void buildSchema() throws SchemaChangeException {
+ public void buildSchema() {
IterOutcome outcome = next(incoming);
switch (outcome) {
case NONE:
@@ -305,7 +328,23 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
state = BatchState.DONE;
// fall through
case RETURN_OUTCOME:
- return aggregator.getOutcome();
+ // rebuilds the schema in the case of complex writer expressions,
+ // since vectors would be added to batch run-time
+ IterOutcome outcome = aggregator.getOutcome();
+ switch (outcome) {
+ case OK:
+ case OK_NEW_SCHEMA:
+ if (firstBatch) {
+ if (CollectionUtils.isNotEmpty(complexWriters)) {
+ container.buildSchema(SelectionVectorMode.NONE);
+ outcome = IterOutcome.OK_NEW_SCHEMA;
+ }
+ firstBatch = false;
+ }
+ // fall thru
+ default:
+ return outcome;
+ }
case UPDATE_AGGREGATOR:
context.getExecutorState().fail(UserException.unsupportedError()
@@ -316,6 +355,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
.build(logger));
close();
killIncoming(false);
+ firstBatch = false;
return IterOutcome.STOP;
default:
throw new IllegalStateException(String.format("Unknown state %s.", out));
@@ -345,7 +385,12 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
}
}
- private HashAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException,
+ @SuppressWarnings("unused") // used in generated code
+ public void addComplexWriter(final BaseWriter.ComplexWriter writer) {
+ complexWriters.add(writer);
+ }
+
+ protected HashAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException,
IOException {
CodeGenerator<HashAggregator> top = CodeGenerator.get(HashAggregator.TEMPLATE_DEFINITION, context.getOptions());
ClassGenerator<HashAggregator> cg = top.getRoot();
@@ -355,8 +400,8 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
// top.saveCodeForDebugging(true);
container.clear();
- numGroupByExprs = (popConfig.getGroupByExprs() != null) ? popConfig.getGroupByExprs().size() : 0;
- numAggrExprs = (popConfig.getAggrExprs() != null) ? popConfig.getAggrExprs().size() : 0;
+ numGroupByExprs = (getKeyExpressions() != null) ? getKeyExpressions().size() : 0;
+ numAggrExprs = (getValueExpressions() != null) ? getValueExpressions().size() : 0;
aggrExprs = new LogicalExpression[numAggrExprs];
groupByOutFieldIds = new TypedFieldId[numGroupByExprs];
aggrOutFieldIds = new TypedFieldId[numAggrExprs];
@@ -364,7 +409,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
ErrorCollector collector = new ErrorCollectorImpl();
for (int i = 0; i < numGroupByExprs; i++) {
- NamedExpression ne = popConfig.getGroupByExprs().get(i);
+ NamedExpression ne = getKeyExpressions().get(i);
final LogicalExpression expr =
ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
if (expr == null) {
@@ -381,7 +426,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
int extraNonNullColumns = 0; // each of SUM, MAX and MIN gets an extra bigint column
for (int i = 0; i < numAggrExprs; i++) {
- NamedExpression ne = popConfig.getAggrExprs().get(i);
+ NamedExpression ne = getValueExpressions().get(i);
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
if (expr instanceof IfExpression) {
@@ -396,30 +441,45 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
continue;
}
- final MaterializedField outputField = MaterializedField.create(ne.getRef().getAsNamePart().getName(), expr.getMajorType());
- ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
- aggrOutFieldIds[i] = container.add(vv);
+ // Populate the complex writers for complex exprs
+ if (expr instanceof DrillFuncHolderExpr &&
+ ((DrillFuncHolderExpr) expr).getHolder().isComplexWriterFuncHolder()) {
+ if (complexWriters == null) {
+ complexWriters = new ArrayList<>();
+ } else {
+ complexWriters.clear();
+ }
+ // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
+ ((DrillFuncHolderExpr) expr).setFieldReference(ne.getRef());
+ MaterializedField field = MaterializedField.create(ne.getRef().getAsNamePart().getName(), UntypedNullHolder.TYPE);
+ container.add(new UntypedNullVector(field, container.getAllocator()));
+ aggrExprs[i] = expr;
+ } else {
+ MaterializedField outputField = MaterializedField.create(ne.getRef().getAsNamePart().getName(), expr.getMajorType());
+ ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
+ aggrOutFieldIds[i] = container.add(vv);
- aggrExprs[i] = new ValueVectorWriteExpression(aggrOutFieldIds[i], expr, true);
+ aggrExprs[i] = new ValueVectorWriteExpression(aggrOutFieldIds[i], expr, true);
- if (expr instanceof FunctionHolderExpression) {
- String funcName = ((FunctionHolderExpression) expr).getName();
- if (funcName.equals("sum") || funcName.equals("max") || funcName.equals("min")) {
- extraNonNullColumns++;
- }
- List<LogicalExpression> args = ((FunctionCall) ne.getExpr()).args;
- if (!args.isEmpty()) {
- if (args.get(0) instanceof SchemaPath) {
- columnMapping.put(outputField.getName(), ((SchemaPath) args.get(0)).getAsNamePart().getName());
- } else if (args.get(0) instanceof FunctionCall) {
- FunctionCall functionCall = (FunctionCall) args.get(0);
- if (functionCall.args.get(0) instanceof SchemaPath) {
- columnMapping.put(outputField.getName(), ((SchemaPath) functionCall.args.get(0)).getAsNamePart().getName());
+ if (expr instanceof FunctionHolderExpression) {
+ String funcName = ((FunctionHolderExpression) expr).getName();
+ if (funcName.equals("sum") || funcName.equals("max") || funcName.equals("min")) {
+ extraNonNullColumns++;
+ }
+ List<LogicalExpression> args = ((FunctionCall) ne.getExpr()).args;
+ if (!args.isEmpty()) {
+ if (args.get(0) instanceof SchemaPath) {
+ columnMapping.put(outputField.getName(), ((SchemaPath) args.get(0)).getAsNamePart().getName());
+ } else if (args.get(0) instanceof FunctionCall) {
+ FunctionCall functionCall = (FunctionCall) args.get(0);
+ if (functionCall.args.get(0) instanceof SchemaPath) {
+ columnMapping.put(outputField.getName(), ((SchemaPath) functionCall.args.get(0)).getAsNamePart().getName());
+ }
}
}
+ } else {
+ columnMapping.put(outputField.getName(), ne.getRef().getAsNamePart().getName());
}
- } else {
- columnMapping.put(outputField.getName(), ne.getRef().getAsNamePart().getName());
}
}
@@ -433,7 +493,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
HashTableConfig htConfig =
// TODO - fix the validator on this option
new HashTableConfig((int)context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
- HashTable.DEFAULT_LOAD_FACTOR, popConfig.getGroupByExprs(), null /* no probe exprs */, comparators);
+ HashTable.DEFAULT_LOAD_FACTOR, getKeyExpressions(), null /* no probe exprs */, comparators);
agg.setup(popConfig, htConfig, context, oContext, incoming, this,
aggrExprs,
@@ -445,6 +505,14 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
return agg;
}
+ protected List<NamedExpression> getKeyExpressions() {
+ return popConfig.getGroupByExprs();
+ }
+
+ protected List<NamedExpression> getValueExpressions() {
+ return popConfig.getAggrExprs();
+ }
+
private void setupUpdateAggrValues(ClassGenerator<HashAggregator> cg) {
cg.setMappingSet(UpdateAggrValuesMapping);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index d166353..7fcfd99 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -34,6 +34,8 @@ import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.cache.VectorSerializer.Writer;
import org.apache.drill.exec.compile.sig.RuntimeOverridden;
@@ -779,12 +781,19 @@ public abstract class HashAggTemplate implements HashAggregator {
while (outgoingIter.hasNext()) {
ValueVector vv = outgoingIter.next().getValueVector();
- AllocationHelper.allocatePrecomputedChildCount(vv, records, maxColumnWidth, 0);
+ // Prevent allocating complex vectors here to avoid losing their content
+ // since their writers will still be used in generated code
+ TypeProtos.MajorType majorType = vv.getField().getType();
+ if (!Types.isComplex(majorType)
+ && !Types.isUnion(majorType)
+ && !Types.isRepeated(majorType)) {
+ AllocationHelper.allocatePrecomputedChildCount(vv, records, maxColumnWidth, 0);
+ }
}
long memAdded = allocator.getAllocatedMemory() - allocatedBefore;
- if ( memAdded > estOutgoingAllocSize ) {
- logger.trace("Output values allocated {} but the estimate was only {}. Adjusting ...",memAdded,estOutgoingAllocSize);
+ if (memAdded > estOutgoingAllocSize) {
+ logger.trace("Output values allocated {} but the estimate was only {}. Adjusting ...", memAdded, estOutgoingAllocSize);
estOutgoingAllocSize = memAdded;
}
outContainer.setRecordCount(records);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 8fc7118..c3b504a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -45,7 +45,6 @@ import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.DrillFuncHolderExpr;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.HoldingContainerExpression;
-import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.drill.exec.ops.FragmentContext;
@@ -476,8 +475,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
keyExprs[i] = expr;
MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(),
expr.getMajorType());
- ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
- keyOutputIds[i] = container.add(vector);
+ container.addOrGet(outputField);
+ keyOutputIds[i] = container.getValueVectorId(ne.getRef());
}
for (int i = 0; i < valueExprs.length; i++) {
@@ -501,15 +500,15 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
complexWriters.clear();
}
// The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
- ((DrillFuncHolderExpr) expr).getFieldReference(ne.getRef());
+ ((DrillFuncHolderExpr) expr).setFieldReference(ne.getRef());
MaterializedField field = MaterializedField.create(ne.getRef().getAsNamePart().getName(), UntypedNullHolder.TYPE);
container.add(new UntypedNullVector(field, container.getAllocator()));
valueExprs[i] = expr;
} else {
MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(),
expr.getMajorType());
- ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
- TypedFieldId id = container.add(vector);
+ container.addOrGet(outputField);
+ TypedFieldId id = container.getValueVectorId(ne.getRef());
valueExprs[i] = new ValueVectorWriteExpression(id, expr, true);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index cee7625..34e9d7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -440,7 +440,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
}
// The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
- ((DrillFuncHolderExpr) expr).getFieldReference(namedExpression.getRef());
+ ((DrillFuncHolderExpr) expr).setFieldReference(namedExpression.getRef());
cg.addExpr(expr);
} else {
// need to do evaluation.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggregateHelper.java
similarity index 68%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggBatch.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggregateHelper.java
index 51dfb15..1cca788 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggregateHelper.java
@@ -27,23 +27,17 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.metastore.analyze.MetastoreAnalyzeConstants;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.MetadataAggPOP;
-import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
-import org.apache.drill.exec.physical.impl.aggregate.StreamingAggregator;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
import org.apache.drill.exec.metastore.analyze.AnalyzeColumnUtils;
+import org.apache.drill.exec.metastore.analyze.MetadataAggregateContext;
+import org.apache.drill.exec.metastore.analyze.MetastoreAnalyzeConstants;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
import org.apache.drill.exec.planner.types.DrillRelDataTypeSystem;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.metastore.metadata.MetadataType;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -53,26 +47,32 @@ import java.util.Map;
import java.util.stream.StreamSupport;
/**
- * Operator which adds aggregate calls for all incoming columns to calculate required metadata and produces aggregations.
- * If aggregation is performed on top of another aggregation, required aggregate calls for merging metadata will be added.
+ * Helper class for constructing aggregate value expressions required for metadata collecting.
*/
-public class MetadataAggBatch extends StreamingAggBatch {
-
- private List<NamedExpression> valueExpressions;
+public class MetadataAggregateHelper {
+ private final List<NamedExpression> valueExpressions;
+ private final MetadataAggregateContext context;
+ private final ColumnNamesOptions columnNamesOptions;
+ private final BatchSchema schema;
+ private final AggPrelBase.OperatorPhase phase;
- public MetadataAggBatch(MetadataAggPOP popConfig, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
- super(popConfig, incoming, context);
+ public MetadataAggregateHelper(MetadataAggregateContext context, ColumnNamesOptions columnNamesOptions,
+ BatchSchema schema, AggPrelBase.OperatorPhase phase) {
+ this.context = context;
+ this.columnNamesOptions = columnNamesOptions;
+ this.schema = schema;
+ this.phase = phase;
+ this.valueExpressions = new ArrayList<>();
+ createAggregatorInternal();
}
- @Override
- protected StreamingAggregator createAggregatorInternal()
- throws SchemaChangeException, ClassTransformationException, IOException {
- valueExpressions = new ArrayList<>();
- MetadataAggPOP popConfig = (MetadataAggPOP) this.popConfig;
+ public List<NamedExpression> getValueExpressions() {
+ return valueExpressions;
+ }
- List<SchemaPath> excludedColumns = popConfig.getContext().excludedColumns();
+ private void createAggregatorInternal() {
+ List<SchemaPath> excludedColumns = context.excludedColumns();
- BatchSchema schema = incoming.getSchema();
// Iterates through input expressions and adds aggregate calls for table fields
// to collect required statistics (MIN, MAX, COUNT, etc.) or aggregate calls to merge incoming metadata
getUnflattenedFileds(Lists.newArrayList(schema), null)
@@ -90,19 +90,36 @@ public class MetadataAggBatch extends StreamingAggBatch {
fieldsList.add(FieldReference.getWithQuotedRef(filedName));
});
- if (popConfig.getContext().createNewAggregations()) {
+ if (createNewAggregations()) {
addMetadataAggregateCalls();
// infer schema from incoming data
addSchemaCall(fieldsList);
- addNewMetadataAggregations();
+ // adds any_value(`location`) call for SEGMENT level
+ if (context.metadataLevel() == MetadataType.SEGMENT) {
+ addLocationAggCall(columnNamesOptions.fullyQualifiedName());
+ }
} else {
- addCollectListCall(fieldsList);
+ if (!context.createNewAggregations()) {
+ // collects incoming metadata
+ addCollectListCall(fieldsList);
+ }
addMergeSchemaCall();
+ String locationField = MetastoreAnalyzeConstants.LOCATION_FIELD;
+
+ if (context.createNewAggregations()) {
+ locationField = columnNamesOptions.fullyQualifiedName();
+ }
+
+ if (context.metadataLevel() == MetadataType.SEGMENT) {
+ addParentLocationAggCall();
+ } else {
+ addLocationAggCall(locationField);
+ }
}
for (SchemaPath excludedColumn : excludedColumns) {
- if (excludedColumn.equals(SchemaPath.getSimplePath(context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_START_COLUMN_LABEL)))
- || excludedColumn.equals(SchemaPath.getSimplePath(context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_LENGTH_COLUMN_LABEL)))) {
+ if (excludedColumn.equals(SchemaPath.getSimplePath(columnNamesOptions.rowGroupStart()))
+ || excludedColumn.equals(SchemaPath.getSimplePath(columnNamesOptions.rowGroupLength()))) {
LogicalExpression lastModifiedTime = new FunctionCall("any_value",
Collections.singletonList(
FieldReference.getWithQuotedRef(excludedColumn.getRootSegmentPath())),
@@ -113,20 +130,69 @@ public class MetadataAggBatch extends StreamingAggBatch {
}
}
- addMaxLastModifiedCall();
+ addLastModifiedCall();
+ }
+
+ /**
+ * Adds any_value(parentPath(`location`)) aggregate call to the value expressions list.
+ */
+ private void addParentLocationAggCall() {
+ valueExpressions.add(
+ new NamedExpression(
+ new FunctionCall(
+ "any_value",
+ Collections.singletonList(
+ new FunctionCall(
+ "parentPath",
+ Collections.singletonList(SchemaPath.getSimplePath(MetastoreAnalyzeConstants.LOCATION_FIELD)),
+ ExpressionPosition.UNKNOWN)),
+ ExpressionPosition.UNKNOWN),
+ FieldReference.getWithQuotedRef(MetastoreAnalyzeConstants.LOCATION_FIELD)));
+ }
+
+ /**
+ * Adds any_value(`location`) aggregate call to the value expressions list.
+ *
+ * @param locationField name of the location field
+ */
+ private void addLocationAggCall(String locationField) {
+ valueExpressions.add(
+ new NamedExpression(
+ new FunctionCall(
+ "any_value",
+ Collections.singletonList(SchemaPath.getSimplePath(locationField)),
+ ExpressionPosition.UNKNOWN),
+ FieldReference.getWithQuotedRef(MetastoreAnalyzeConstants.LOCATION_FIELD)));
+ }
- return super.createAggregatorInternal();
+ /**
+ * Checks whether incoming data is not grouped, so corresponding aggregate calls should be created.
+ *
+ * @return {@code true} if incoming data is not grouped, {@code false} otherwise.
+ */
+ private boolean createNewAggregations() {
+ return context.createNewAggregations()
+ && (phase == AggPrelBase.OperatorPhase.PHASE_1of2
+ || phase == AggPrelBase.OperatorPhase.PHASE_1of1);
}
/**
* Adds {@code max(`lastModifiedTime`)} function call to the value expressions list.
*/
- private void addMaxLastModifiedCall() {
- String lastModifiedColumn = context.getOptions().getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL);
- LogicalExpression lastModifiedTime = new FunctionCall("max",
- Collections.singletonList(
- FieldReference.getWithQuotedRef(lastModifiedColumn)),
- ExpressionPosition.UNKNOWN);
+ private void addLastModifiedCall() {
+ String lastModifiedColumn = columnNamesOptions.lastModifiedTime();
+ LogicalExpression lastModifiedTime;
+ if (createNewAggregations()) {
+ lastModifiedTime = new FunctionCall("any_value",
+ Collections.singletonList(
+ FieldReference.getWithQuotedRef(lastModifiedColumn)),
+ ExpressionPosition.UNKNOWN);
+ } else {
+ lastModifiedTime = new FunctionCall("max",
+ Collections.singletonList(
+ FieldReference.getWithQuotedRef(lastModifiedColumn)),
+ ExpressionPosition.UNKNOWN);
+ }
valueExpressions.add(new NamedExpression(lastModifiedTime,
FieldReference.getWithQuotedRef(lastModifiedColumn)));
@@ -140,8 +206,7 @@ public class MetadataAggBatch extends StreamingAggBatch {
*/
private void addCollectListCall(List<LogicalExpression> fieldList) {
ArrayList<LogicalExpression> collectListArguments = new ArrayList<>(fieldList);
- MetadataAggPOP popConfig = (MetadataAggPOP) this.popConfig;
- List<SchemaPath> excludedColumns = popConfig.getContext().excludedColumns();
+ List<SchemaPath> excludedColumns = context.excludedColumns();
// populate columns which weren't included in the schema, but should be collected to the COLLECTED_MAP_FIELD
for (SchemaPath logicalExpressions : excludedColumns) {
// adds string literal with field name to the list
@@ -170,17 +235,6 @@ public class MetadataAggBatch extends StreamingAggBatch {
}
/**
- * Adds {@code collect_to_list_varchar(`fqn`)} call to collect file paths into the list.
- */
- private void addNewMetadataAggregations() {
- LogicalExpression locationsExpr = new FunctionCall("collect_to_list_varchar",
- Collections.singletonList(SchemaPath.getSimplePath(context.getOptions().getString(ExecConstants.IMPLICIT_FQN_COLUMN_LABEL))),
- ExpressionPosition.UNKNOWN);
-
- valueExpressions.add(new NamedExpression(locationsExpr, FieldReference.getWithQuotedRef(MetastoreAnalyzeConstants.LOCATIONS_FIELD)));
- }
-
- /**
* Adds a call to {@code schema()}} function with specified fields list
* as arguments of this function to obtain their schema.
*
@@ -220,8 +274,7 @@ public class MetadataAggBatch extends StreamingAggBatch {
for (MaterializedField field : fields) {
// statistics collecting is not supported for array types
if (field.getType().getMode() != TypeProtos.DataMode.REPEATED) {
- MetadataAggPOP popConfig = (MetadataAggPOP) this.popConfig;
- List<SchemaPath> excludedColumns = popConfig.getContext().excludedColumns();
+ List<SchemaPath> excludedColumns = context.excludedColumns();
// excludedColumns are applied for root fields only
if (parentFields != null || !excludedColumns.contains(SchemaPath.getSimplePath(field.getName()))) {
List<String> currentPath;
@@ -231,12 +284,12 @@ public class MetadataAggBatch extends StreamingAggBatch {
currentPath = new ArrayList<>(parentFields);
currentPath.add(field.getName());
}
- if (field.getType().getMinorType() == TypeProtos.MinorType.MAP && popConfig.getContext().createNewAggregations()) {
+ if (field.getType().getMinorType() == TypeProtos.MinorType.MAP && createNewAggregations()) {
fieldNameRefMap.putAll(getUnflattenedFileds(field.getChildren(), currentPath));
} else {
SchemaPath schemaPath = SchemaPath.getCompoundPath(currentPath.toArray(new String[0]));
// adds backticks for popConfig.createNewAggregations() to ensure that field will be parsed correctly
- String name = popConfig.getContext().createNewAggregations() ? schemaPath.toExpr() : schemaPath.getRootSegmentPath();
+ String name = createNewAggregations() ? schemaPath.toExpr() : schemaPath.getRootSegmentPath();
fieldNameRefMap.put(name, new FieldReference(schemaPath));
}
}
@@ -254,9 +307,8 @@ public class MetadataAggBatch extends StreamingAggBatch {
* @param fieldName field name
*/
private void addColumnAggregateCalls(FieldReference fieldRef, String fieldName) {
- MetadataAggPOP popConfig = (MetadataAggPOP) this.popConfig;
- List<SchemaPath> interestingColumns = popConfig.getContext().interestingColumns();
- if (popConfig.getContext().createNewAggregations()) {
+ List<SchemaPath> interestingColumns = context.interestingColumns();
+ if (createNewAggregations()) {
if (interestingColumns == null || interestingColumns.contains(fieldRef)) {
// collect statistics for all or only interesting columns if they are specified
AnalyzeColumnUtils.COLUMN_STATISTICS_FUNCTIONS.forEach((statisticsKind, sqlKind) -> {
@@ -280,9 +332,4 @@ public class MetadataAggBatch extends StreamingAggBatch {
valueExpressions.add(new NamedExpression(functionCall, fieldRef));
}
}
-
- @Override
- protected List<NamedExpression> getValueExpressions() {
- return valueExpressions;
- }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
index 0c94e3d..9ccae49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
@@ -21,8 +21,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
import org.apache.drill.exec.metastore.analyze.AnalyzeColumnUtils;
import org.apache.drill.exec.metastore.analyze.MetastoreAnalyzeConstants;
import org.apache.drill.exec.ops.FragmentContext;
@@ -105,6 +105,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
private final Map<String, MetadataInfo> metadataToHandle;
private final StatisticsRecordCollector statisticsCollector;
private final List<TableMetadataUnit> metadataUnits;
+ private final ColumnNamesOptions columnNamesOptions;
private boolean firstLeft = true;
private boolean firstRight = true;
@@ -123,6 +124,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
.collect(Collectors.toMap(MetadataInfo::identifier, Function.identity()));
this.metadataUnits = new ArrayList<>();
this.statisticsCollector = new StatisticsCollectorImpl();
+ this.columnNamesOptions = new ColumnNamesOptions(context.getOptions());
}
protected boolean setupNewSchema() {
@@ -132,6 +134,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
container.addOrGet(MetastoreAnalyzeConstants.SUMMARY_FIELD_NAME, Types.required(TypeProtos.MinorType.VARCHAR), null);
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ container.setEmpty();
return true;
}
@@ -428,7 +431,6 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
private PartitionMetadata getPartitionMetadata(TupleReader reader, List<StatisticsHolder> metadataStatistics,
Map<SchemaPath, ColumnStatistics> columnStatistics, int nestingLevel) {
List<String> segmentColumns = popConfig.getContext().segmentColumns();
- String lastModifiedTimeCol = context.getOptions().getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL);
String segmentKey = segmentColumns.size() > 0
? reader.column(segmentColumns.iterator().next()).scalar().getString()
@@ -450,7 +452,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
.columnsStatistics(columnStatistics)
.metadataStatistics(metadataStatistics)
.locations(getIncomingLocations(reader))
- .lastModifiedTime(Long.parseLong(reader.column(lastModifiedTimeCol).scalar().getString()))
+ .lastModifiedTime(Long.parseLong(reader.column(columnNamesOptions.lastModifiedTime()).scalar().getString()))
// .column(SchemaPath.getSimplePath("dir1"))
// .partitionValues()
.schema(TupleMetadata.of(reader.column(MetastoreAnalyzeConstants.SCHEMA_FIELD).scalar().getString()))
@@ -460,7 +462,6 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
@SuppressWarnings("unchecked")
private BaseTableMetadata getTableMetadata(TupleReader reader, List<StatisticsHolder> metadataStatistics,
Map<SchemaPath, ColumnStatistics> columnStatistics) {
- String lastModifiedTimeCol = context.getOptions().getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL);
List<StatisticsHolder> updatedMetaStats = new ArrayList<>(metadataStatistics);
updatedMetaStats.add(new StatisticsHolder(popConfig.getContext().analyzeMetadataLevel(), TableStatisticsKind.ANALYZE_METADATA_LEVEL));
@@ -477,7 +478,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
.partitionKeys(Collections.emptyMap())
.interestingColumns(popConfig.getContext().interestingColumns())
.location(popConfig.getContext().location())
- .lastModifiedTime(Long.parseLong(reader.column(lastModifiedTimeCol).scalar().getString()))
+ .lastModifiedTime(Long.parseLong(reader.column(columnNamesOptions.lastModifiedTime()).scalar().getString()))
.schema(TupleMetadata.of(reader.column(MetastoreAnalyzeConstants.SCHEMA_FIELD).scalar().getString()))
.build();
@@ -494,7 +495,6 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
private SegmentMetadata getSegmentMetadata(TupleReader reader, List<StatisticsHolder> metadataStatistics,
Map<SchemaPath, ColumnStatistics> columnStatistics, int nestingLevel) {
List<String> segmentColumns = popConfig.getContext().segmentColumns();
- String lastModifiedTimeCol = context.getOptions().getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL);
String segmentKey = segmentColumns.size() > 0
? reader.column(segmentColumns.iterator().next()).scalar().getString()
@@ -521,7 +521,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
.locations(getIncomingLocations(reader))
.column(segmentColumns.size() > 0 ? SchemaPath.getSimplePath(segmentColumns.get(nestingLevel - 1)) : null)
.partitionValues(partitionValues)
- .lastModifiedTime(Long.parseLong(reader.column(lastModifiedTimeCol).scalar().getString()))
+ .lastModifiedTime(Long.parseLong(reader.column(columnNamesOptions.lastModifiedTime()).scalar().getString()))
.schema(TupleMetadata.of(reader.column(MetastoreAnalyzeConstants.SCHEMA_FIELD).scalar().getString()))
.build();
}
@@ -529,7 +529,6 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
private FileMetadata getFileMetadata(TupleReader reader, List<StatisticsHolder> metadataStatistics,
Map<SchemaPath, ColumnStatistics> columnStatistics, int nestingLevel) {
List<String> segmentColumns = popConfig.getContext().segmentColumns();
- String lastModifiedTimeCol = context.getOptions().getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL);
String segmentKey = segmentColumns.size() > 0
? reader.column(segmentColumns.iterator().next()).scalar().getString()
@@ -555,15 +554,13 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
.columnsStatistics(columnStatistics)
.metadataStatistics(metadataStatistics)
.path(path)
- .lastModifiedTime(Long.parseLong(reader.column(lastModifiedTimeCol).scalar().getString()))
+ .lastModifiedTime(Long.parseLong(reader.column(columnNamesOptions.lastModifiedTime()).scalar().getString()))
.schema(TupleMetadata.of(reader.column(MetastoreAnalyzeConstants.SCHEMA_FIELD).scalar().getString()))
.build();
}
private RowGroupMetadata getRowGroupMetadata(TupleReader reader,List<StatisticsHolder> metadataStatistics,
Map<SchemaPath, ColumnStatistics> columnStatistics, int nestingLevel) {
- String lastModifiedTimeCol = context.getOptions().getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL);
- String rgi = context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_INDEX_COLUMN_LABEL);
List<String> segmentColumns = popConfig.getContext().segmentColumns();
String segmentKey = segmentColumns.size() > 0
@@ -577,7 +574,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
Path path = new Path(reader.column(MetastoreAnalyzeConstants.LOCATION_FIELD).scalar().getString());
- int rowGroupIndex = Integer.parseInt(reader.column(rgi).scalar().getString());
+ int rowGroupIndex = Integer.parseInt(reader.column(columnNamesOptions.rowGroupIndex()).scalar().getString());
String metadataIdentifier = MetadataIdentifierUtils.getRowGroupMetadataIdentifier(partitionValues, path, rowGroupIndex);
@@ -595,7 +592,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
.hostAffinity(Collections.emptyMap())
.rowGroupIndex(rowGroupIndex)
.path(path)
- .lastModifiedTime(Long.parseLong(reader.column(lastModifiedTimeCol).scalar().getString()))
+ .lastModifiedTime(Long.parseLong(reader.column(columnNamesOptions.lastModifiedTime()).scalar().getString()))
.schema(TupleMetadata.of(reader.column(MetastoreAnalyzeConstants.SCHEMA_FIELD).scalar().getString()))
.build();
}
@@ -644,19 +641,22 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
@SuppressWarnings("unchecked")
private List<StatisticsHolder> getMetadataStatistics(TupleReader reader, TupleMetadata columnMetadata) {
List<StatisticsHolder> metadataStatistics = new ArrayList<>();
- String rgs = context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_START_COLUMN_LABEL);
- String rgl = context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_LENGTH_COLUMN_LABEL);
+ String rgs = columnNamesOptions.rowGroupStart();
+ String rgl = columnNamesOptions.rowGroupLength();
for (ColumnMetadata column : columnMetadata) {
String columnName = column.name();
+ ObjectReader objectReader = reader.column(columnName);
if (AnalyzeColumnUtils.isMetadataStatisticsField(columnName)) {
- metadataStatistics.add(new StatisticsHolder(reader.column(columnName).getObject(),
+ metadataStatistics.add(new StatisticsHolder(objectReader.getObject(),
AnalyzeColumnUtils.getStatisticsKind(columnName)));
- } else if (columnName.equals(rgs)) {
- metadataStatistics.add(new StatisticsHolder(Long.parseLong(reader.column(columnName).scalar().getString()),
- new BaseStatisticsKind(ExactStatisticsConstants.START, true)));
- } else if (columnName.equals(rgl)) {
- metadataStatistics.add(new StatisticsHolder(Long.parseLong(reader.column(columnName).scalar().getString()),
- new BaseStatisticsKind(ExactStatisticsConstants.LENGTH, true)));
+ } else if (!objectReader.isNull()) {
+ if (columnName.equals(rgs)) {
+ metadataStatistics.add(new StatisticsHolder(Long.parseLong(objectReader.scalar().getString()),
+ new BaseStatisticsKind(ExactStatisticsConstants.START, true)));
+ } else if (columnName.equals(rgl)) {
+ metadataStatistics.add(new StatisticsHolder(Long.parseLong(objectReader.scalar().getString()),
+ new BaseStatisticsKind(ExactStatisticsConstants.LENGTH, true)));
+ }
}
}
return metadataStatistics;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
index 7444a04..22e90fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
@@ -20,8 +20,8 @@ package org.apache.drill.exec.physical.impl.metadata;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
import org.apache.drill.exec.metastore.analyze.MetastoreAnalyzeConstants;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.MetadataHandlerPOP;
@@ -81,6 +81,7 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
private final Tables tables;
private final MetadataType metadataType;
private final Map<String, MetadataInfo> metadataToHandle;
+ private final ColumnNamesOptions columnNamesOptions;
private boolean firstBatch = true;
@@ -89,6 +90,7 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
super(popConfig, context, incoming);
this.tables = context.getMetastoreRegistry().get().tables();
this.metadataType = popConfig.getContext().metadataType();
+ this.columnNamesOptions = new ColumnNamesOptions(context.getOptions());
this.metadataToHandle = popConfig.getContext().metadataToHandle() != null
? popConfig.getContext().metadataToHandle().stream()
.collect(Collectors.toMap(MetadataInfo::identifier, Function.identity()))
@@ -101,7 +103,7 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
// 2. For the case when incoming operator returned nothing - no updated underlying metadata was found.
// 3. Fetches metadata which should be handled but wasn't returned by incoming batch from the Metastore
- IterOutcome outcome = next(incoming);
+ IterOutcome outcome = incoming.getRecordCount() == 0 ? next(incoming) : getLastKnownOutcome();
switch (outcome) {
case NONE:
@@ -117,8 +119,7 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
outcome = IterOutcome.OK;
}
}
- doWorkInternal();
- return outcome;
+ // fall thru
case OK:
assert !firstBatch : "First batch should be OK_NEW_SCHEMA";
doWorkInternal();
@@ -286,14 +287,14 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
}
if (metadataType == MetadataType.ROW_GROUP) {
- schemaBuilder.addNullable(context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_INDEX_COLUMN_LABEL), MinorType.VARCHAR);
- schemaBuilder.addNullable(context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_START_COLUMN_LABEL), MinorType.VARCHAR);
- schemaBuilder.addNullable(context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_LENGTH_COLUMN_LABEL), MinorType.VARCHAR);
+ schemaBuilder.addNullable(columnNamesOptions.rowGroupIndex(), MinorType.VARCHAR);
+ schemaBuilder.addNullable(columnNamesOptions.rowGroupStart(), MinorType.VARCHAR);
+ schemaBuilder.addNullable(columnNamesOptions.rowGroupLength(), MinorType.VARCHAR);
}
schemaBuilder
.addNullable(MetastoreAnalyzeConstants.SCHEMA_FIELD, MinorType.VARCHAR)
- .addNullable(context.getOptions().getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL), MinorType.VARCHAR)
+ .addNullable(columnNamesOptions.lastModifiedTime(), MinorType.VARCHAR)
.add(MetastoreAnalyzeConstants.METADATA_TYPE, MinorType.VARCHAR);
ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
@@ -307,11 +308,6 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
private <T extends BaseMetadata & LocationProvider> VectorContainer writeMetadataUsingBatchSchema(List<T> metadataList) {
Preconditions.checkArgument(!metadataList.isEmpty(), "Metadata list shouldn't be empty.");
- String lastModifiedTimeField = context.getOptions().getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL);
- String rgiField = context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_INDEX_COLUMN_LABEL);
- String rgsField = context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_START_COLUMN_LABEL);
- String rglField = context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_LENGTH_COLUMN_LABEL);
-
ResultSetLoader resultSetLoader = getResultSetLoaderWithBatchSchema();
resultSetLoader.startBatch();
RowSetLoader rowWriter = resultSetLoader.writer();
@@ -352,13 +348,13 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
arguments.add(new Object[]{});
} else if (fieldName.equals(MetastoreAnalyzeConstants.SCHEMA_FIELD)) {
arguments.add(metadata.getSchema().jsonString());
- } else if (fieldName.equals(lastModifiedTimeField)) {
+ } else if (fieldName.equals(columnNamesOptions.lastModifiedTime())) {
arguments.add(String.valueOf(metadata.getLastModifiedTime()));
- } else if (fieldName.equals(rgiField)) {
+ } else if (fieldName.equals(columnNamesOptions.rowGroupIndex())) {
arguments.add(String.valueOf(((RowGroupMetadata) metadata).getRowGroupIndex()));
- } else if (fieldName.equals(rgsField)) {
+ } else if (fieldName.equals(columnNamesOptions.rowGroupStart())) {
arguments.add(Long.toString(metadata.getStatistic(() -> ExactStatisticsConstants.START)));
- } else if (fieldName.equals(rglField)) {
+ } else if (fieldName.equals(columnNamesOptions.rowGroupLength())) {
arguments.add(Long.toString(metadata.getStatistic(() -> ExactStatisticsConstants.LENGTH)));
} else if (fieldName.equals(MetastoreAnalyzeConstants.METADATA_TYPE)) {
arguments.add(metadataType.name());
@@ -374,10 +370,6 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
}
private ResultSetLoader getResultSetLoaderWithBatchSchema() {
- String lastModifiedTimeField = context.getOptions().getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL);
- String rgiField = context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_INDEX_COLUMN_LABEL);
- String rgsField = context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_START_COLUMN_LABEL);
- String rglField = context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_LENGTH_COLUMN_LABEL);
SchemaBuilder schemaBuilder = new SchemaBuilder();
// adds fields to the schema preserving their order to avoid issues in outcoming batches
for (VectorWrapper<?> vectorWrapper : container) {
@@ -385,10 +377,10 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
String fieldName = field.getName();
if (fieldName.equals(MetastoreAnalyzeConstants.LOCATION_FIELD)
|| fieldName.equals(MetastoreAnalyzeConstants.SCHEMA_FIELD)
- || fieldName.equals(lastModifiedTimeField)
- || fieldName.equals(rgiField)
- || fieldName.equals(rgsField)
- || fieldName.equals(rglField)
+ || fieldName.equals(columnNamesOptions.lastModifiedTime())
+ || fieldName.equals(columnNamesOptions.rowGroupIndex())
+ || fieldName.equals(columnNamesOptions.rowGroupStart())
+ || fieldName.equals(columnNamesOptions.rowGroupLength())
|| fieldName.equals(MetastoreAnalyzeConstants.METADATA_TYPE)
|| popConfig.getContext().segmentColumns().contains(fieldName)) {
schemaBuilder.add(fieldName, field.getType().getMinorType(), field.getDataMode());
@@ -416,9 +408,9 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
container.clear();
StreamSupport.stream(populatedContainer.spliterator(), false)
.map(VectorWrapper::getField)
- .filter(field -> field.getType().getMinorType() != MinorType.NULL)
.forEach(container::addOrGet);
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ container.setEmpty();
}
protected boolean setupNewSchema() {
@@ -445,18 +437,17 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
}
private void updateMetadataToHandle() {
- RowSetReader reader = DirectRowSet.fromContainer(container).reader();
// updates metadataToHandle to be able to fetch required data which wasn't returned by incoming batch
if (metadataToHandle != null && !metadataToHandle.isEmpty()) {
+ RowSetReader reader = DirectRowSet.fromContainer(container).reader();
switch (metadataType) {
case ROW_GROUP: {
- String rgiColumnName = context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_INDEX_COLUMN_LABEL);
while (reader.next() && !metadataToHandle.isEmpty()) {
List<String> partitionValues = popConfig.getContext().segmentColumns().stream()
.map(columnName -> reader.column(columnName).scalar().getString())
.collect(Collectors.toList());
Path location = new Path(reader.column(MetastoreAnalyzeConstants.LOCATION_FIELD).scalar().getString());
- int rgi = Integer.parseInt(reader.column(rgiColumnName).scalar().getString());
+ int rgi = Integer.parseInt(reader.column(columnNamesOptions.rowGroupIndex()).scalar().getString());
metadataToHandle.remove(MetadataIdentifierUtils.getRowGroupMetadataIdentifier(partitionValues, location, rgi));
}
break;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHashAggBatch.java
new file mode 100644
index 0000000..7c83b2d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHashAggBatch.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.metadata;
+
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.MetadataHashAggPOP;
+import org.apache.drill.exec.physical.impl.aggregate.HashAggBatch;
+import org.apache.drill.exec.physical.impl.aggregate.HashAggregator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.io.IOException;
+import java.util.List;
+
+public class MetadataHashAggBatch extends HashAggBatch {
+ private List<NamedExpression> valueExpressions;
+
+ public MetadataHashAggBatch(MetadataHashAggPOP popConfig, RecordBatch incoming, FragmentContext context) {
+ super(popConfig, incoming, context);
+ }
+
+ @Override
+ protected HashAggregator createAggregatorInternal()
+ throws SchemaChangeException, ClassTransformationException, IOException {
+ MetadataHashAggPOP popConfig = (MetadataHashAggPOP) this.popConfig;
+
+ valueExpressions = new MetadataAggregateHelper(popConfig.getContext(),
+ new ColumnNamesOptions(context.getOptions()), incoming.getSchema(), popConfig.getPhase())
+ .getValueExpressions();
+
+ return super.createAggregatorInternal();
+ }
+
+ @Override
+ protected List<NamedExpression> getValueExpressions() {
+ return valueExpressions;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHashAggBatchCreator.java
similarity index 80%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggBatchCreator.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHashAggBatchCreator.java
index eda40e9..2563b0e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHashAggBatchCreator.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.metadata;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
-import org.apache.drill.exec.physical.config.MetadataAggPOP;
+import org.apache.drill.exec.physical.config.MetadataHashAggPOP;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
@@ -27,12 +27,12 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import java.util.List;
-public class MetadataAggBatchCreator implements BatchCreator<MetadataAggPOP> {
+public class MetadataHashAggBatchCreator implements BatchCreator<MetadataHashAggPOP> {
@Override
public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
- MetadataAggPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+ MetadataHashAggPOP config, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
- return new MetadataAggBatch(config, children.iterator().next(), context);
+ return new MetadataHashAggBatch(config, children.iterator().next(), context);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataStreamAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataStreamAggBatch.java
new file mode 100644
index 0000000..d982179
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataStreamAggBatch.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.metadata;
+
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.MetadataStreamAggPOP;
+import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
+import org.apache.drill.exec.physical.impl.aggregate.StreamingAggregator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Operator which adds aggregate calls for all incoming columns to calculate required metadata and produces aggregations.
+ * If aggregation is performed on top of another aggregation, required aggregate calls for merging metadata will be added.
+ */
+public class MetadataStreamAggBatch extends StreamingAggBatch {
+
+ private List<NamedExpression> valueExpressions;
+
+ public MetadataStreamAggBatch(MetadataStreamAggPOP popConfig, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
+ super(popConfig, incoming, context);
+ }
+
+ @Override
+ protected StreamingAggregator createAggregatorInternal()
+ throws SchemaChangeException, ClassTransformationException, IOException {
+ MetadataStreamAggPOP popConfig = (MetadataStreamAggPOP) this.popConfig;
+
+ valueExpressions = new MetadataAggregateHelper(popConfig.getContext(),
+ new ColumnNamesOptions(context.getOptions()), incoming.getSchema(), popConfig.getPhase())
+ .getValueExpressions();
+
+ return super.createAggregatorInternal();
+ }
+
+ @Override
+ protected List<NamedExpression> getValueExpressions() {
+ return valueExpressions;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataStreamAggBatchCreator.java
similarity index 80%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggBatchCreator.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataStreamAggBatchCreator.java
index eda40e9..15c494b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataStreamAggBatchCreator.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.metadata;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
-import org.apache.drill.exec.physical.config.MetadataAggPOP;
+import org.apache.drill.exec.physical.config.MetadataStreamAggPOP;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
@@ -27,12 +27,12 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import java.util.List;
-public class MetadataAggBatchCreator implements BatchCreator<MetadataAggPOP> {
+public class MetadataStreamAggBatchCreator implements BatchCreator<MetadataStreamAggPOP> {
@Override
public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
- MetadataAggPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+ MetadataStreamAggPOP config, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
- return new MetadataAggBatch(config, children.iterator().next(), context);
+ return new MetadataStreamAggBatch(config, children.iterator().next(), context);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 674b3ab..185b8f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -516,7 +516,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
// The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
- ((DrillFuncHolderExpr) expr).getFieldReference(namedExpression.getRef());
+ ((DrillFuncHolderExpr) expr).setFieldReference(namedExpression.getRef());
cg.addExpr(expr, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
if (complexFieldReferencesList == null) {
complexFieldReferencesList = Lists.newArrayList();
@@ -578,7 +578,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
private boolean isImplicitFileColumn(ValueVector vvIn) {
- return columnExplorer.isImplicitFileColumn(vvIn.getField().getName());
+ return columnExplorer.isImplicitOrInternalFileColumn(vvIn.getField().getName());
}
private List<NamedExpression> getExpressionList() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
index 39189b1..8793a65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
@@ -36,6 +36,10 @@ import org.apache.drill.exec.physical.impl.limit.LimitRecordBatch;
import org.apache.drill.exec.physical.impl.limit.PartitionLimitRecordBatch;
import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
import org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionRecordBatch;
+import org.apache.drill.exec.physical.impl.metadata.MetadataHashAggBatch;
+import org.apache.drill.exec.physical.impl.metadata.MetadataStreamAggBatch;
+import org.apache.drill.exec.physical.impl.metadata.MetadataControllerBatch;
+import org.apache.drill.exec.physical.impl.metadata.MetadataHandlerBatch;
import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
import org.apache.drill.exec.physical.impl.rangepartitioner.RangePartitionRecordBatch;
@@ -247,6 +251,10 @@ public class BatchValidator {
rules.put(HashJoinBatch.class, CheckMode.VECTORS);
rules.put(ExternalSortBatch.class, CheckMode.VECTORS);
rules.put(WriterRecordBatch.class, CheckMode.VECTORS);
+ rules.put(MetadataStreamAggBatch.class, CheckMode.VECTORS);
+ rules.put(MetadataHashAggBatch.class, CheckMode.VECTORS);
+ rules.put(MetadataHandlerBatch.class, CheckMode.VECTORS);
+ rules.put(MetadataControllerBatch.class, CheckMode.VECTORS);
return rules;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BaseReaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BaseReaderBuilder.java
index 75dfeac..3bbe6ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BaseReaderBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BaseReaderBuilder.java
@@ -46,7 +46,7 @@ import org.apache.drill.exec.vector.complex.UnionVector;
* <p>
* Derived classes handle the details of the various kinds of readers.
* Today there is a single subclass that builds (test-time)
- * {@link RowSet} objects. The idea, however, is that we may eventually
+ * {@link org.apache.drill.exec.physical.rowSet.RowSet} objects. The idea, however, is that we may eventually
* want to create a "result set reader" for use in internal operators,
* in parallel to the "result set loader". The result set reader would
* handle a stream of incoming batches. The extant RowSet class handles
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetFormatter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetFormatter.java
index 4b734e6..9bf9f44 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetFormatter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetFormatter.java
@@ -25,6 +25,7 @@ import org.apache.commons.io.output.StringBuilderWriter;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -60,6 +61,10 @@ public class RowSetFormatter {
RowSets.wrap(batch).print();
}
+ public static void print(RecordBatch batch) {
+ RowSets.wrap(batch).print();
+ }
+
public static String toString(RowSet rowSet) {
StringBuilderWriter out = new StringBuilderWriter();
new RowSetFormatter(rowSet, out).write();
@@ -116,18 +121,18 @@ public class RowSetFormatter {
}
private void writeHeader(Writer writer, RowSetReader reader, SelectionVectorMode selectionMode) throws IOException {
- writer.write(Integer.toString(reader.logicalIndex()));
+ writer.write(String.valueOf(reader.logicalIndex()));
switch (selectionMode) {
case FOUR_BYTE:
writer.write(" (");
- writer.write(reader.hyperVectorIndex());
+ writer.write(String.valueOf(reader.hyperVectorIndex()));
writer.write(", ");
- writer.write(Integer.toString(reader.offset()));
+ writer.write(String.valueOf(reader.offset()));
writer.write(")");
break;
case TWO_BYTE:
writer.write(" (");
- writer.write(Integer.toString(reader.offset()));
+ writer.write(String.valueOf(reader.offset()));
writer.write(")");
break;
default:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index a3e907b..203150f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.planner;
+import org.apache.drill.exec.planner.logical.ConvertMetadataAggregateToDirectScanRule;
import org.apache.drill.exec.planner.physical.MetadataAggPrule;
import org.apache.drill.exec.planner.physical.MetadataControllerPrule;
import org.apache.drill.exec.planner.physical.MetadataHandlerPrule;
@@ -525,6 +526,7 @@ public enum PlannerPhase {
ruleList.add(MetadataControllerPrule.INSTANCE);
ruleList.add(MetadataHandlerPrule.INSTANCE);
ruleList.add(MetadataAggPrule.INSTANCE);
+ ruleList.add(ConvertMetadataAggregateToDirectScanRule.INSTANCE);
ruleList.add(UnnestPrule.INSTANCE);
ruleList.add(LateralJoinPrule.INSTANCE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java
index fb29cda..64a0b68 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java
@@ -102,7 +102,7 @@ public class ConvertCountToDirectScanRule extends RelOptRule {
private static final Logger logger = LoggerFactory.getLogger(ConvertCountToDirectScanRule.class);
private ConvertCountToDirectScanRule(RelOptRuleOperand rule, String id) {
- super(rule, "ConvertCountToDirectScanRule:" + id);
+ super(rule, DrillRelFactories.LOGICAL_BUILDER, "ConvertCountToDirectScanRule:" + id);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertMetadataAggregateToDirectScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertMetadataAggregateToDirectScanRule.java
new file mode 100644
index 0000000..43f6383
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertMetadataAggregateToDirectScanRule.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.expr.IsPredicate;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
+import org.apache.drill.exec.metastore.analyze.AnalyzeColumnUtils;
+import org.apache.drill.exec.metastore.analyze.MetadataAggregateContext;
+import org.apache.drill.exec.metastore.analyze.MetastoreAnalyzeConstants;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.ColumnExplorer.ImplicitFileColumns;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.direct.DirectGroupScan;
+import org.apache.drill.exec.store.parquet.ParquetGroupScan;
+import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.metastore.metadata.RowGroupMetadata;
+import org.apache.drill.metastore.statistics.ColumnStatistics;
+import org.apache.drill.metastore.statistics.ColumnStatisticsKind;
+import org.apache.drill.metastore.statistics.ExactStatisticsConstants;
+import org.apache.drill.metastore.statistics.StatisticsKind;
+import org.apache.drill.metastore.statistics.TableStatisticsKind;
+import org.apache.drill.shaded.guava.com.google.common.collect.HashBasedTable;
+import org.apache.drill.shaded.guava.com.google.common.collect.Multimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Table;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Rule which converts
+ *
+ * <pre>
+ * MetadataAggRel(metadataLevel=ROW_GROUP)
+ * \
+ * DrillScanRel
+ * </pre>
+ * <p/>
+ * plan into
+ * <pre>
+ * DrillDirectScanRel
+ * </pre>
+ * where {@link DrillDirectScanRel} is populated with row group metadata.
+ * <p/>
+ * For the case when aggregate level is not ROW_GROUP, resulting plan will be the following:
+ *
+ * <pre>
+ * MetadataAggRel(metadataLevel=FILE (or another non-ROW_GROUP value), createNewAggregations=false)
+ * \
+ * DrillDirectScanRel
+ * </pre>
+ */
+public class ConvertMetadataAggregateToDirectScanRule extends RelOptRule {
+ public static final ConvertMetadataAggregateToDirectScanRule INSTANCE =
+ new ConvertMetadataAggregateToDirectScanRule();
+
+ private static final Logger logger = LoggerFactory.getLogger(ConvertMetadataAggregateToDirectScanRule.class);
+
+ public ConvertMetadataAggregateToDirectScanRule() {
+ super(
+ RelOptHelper.some(MetadataAggRel.class, RelOptHelper.any(DrillScanRel.class)),
+ DrillRelFactories.LOGICAL_BUILDER, "ConvertMetadataAggregateToDirectScanRule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ MetadataAggRel agg = call.rel(0);
+ DrillScanRel scan = call.rel(1);
+
+ GroupScan oldGrpScan = scan.getGroupScan();
+ PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+
+ // Only apply the rule for parquet group scan and for the case when required column metadata is present
+ if (!(oldGrpScan instanceof ParquetGroupScan)
+ || (oldGrpScan.getTableMetadata().getInterestingColumns() != null
+ && !oldGrpScan.getTableMetadata().getInterestingColumns().containsAll(agg.getContext().interestingColumns()))) {
+ return;
+ }
+
+ try {
+ DirectGroupScan directScan = buildDirectScan(agg.getContext().interestingColumns(), scan, settings);
+ if (directScan == null) {
+ logger.warn("Unable to use parquet metadata for ANALYZE since some required metadata is absent within parquet metadata");
+ return;
+ }
+
+ RelNode converted = new DrillDirectScanRel(scan.getCluster(), scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+ directScan, scan.getRowType());
+ if (agg.getContext().metadataLevel() != MetadataType.ROW_GROUP) {
+ MetadataAggregateContext updatedContext = agg.getContext().toBuilder()
+ .createNewAggregations(false)
+ .build();
+ converted = new MetadataAggRel(agg.getCluster(), agg.getTraitSet(), converted, updatedContext);
+ }
+
+ call.transformTo(converted);
+ } catch (Exception e) {
+ logger.warn("Unable to use parquet metadata for ANALYZE: {}", e.getMessage(), e);
+ }
+ }
+
+ private DirectGroupScan buildDirectScan(List<SchemaPath> interestingColumns, DrillScanRel scan, PlannerSettings settings) throws IOException {
+ DrillTable drillTable = Utilities.getDrillTable(scan.getTable());
+
+ ColumnNamesOptions columnNamesOptions = new ColumnNamesOptions(settings.getOptions());
+
+ // populates schema to be used when adding record values
+ FormatSelection selection = (FormatSelection) drillTable.getSelection();
+
+ // adds partition columns to the schema
+ Map<String, Class<?>> schema = ColumnExplorer.getPartitionColumnNames(selection.getSelection(), columnNamesOptions).stream()
+ .collect(Collectors.toMap(
+ Function.identity(),
+ s -> String.class,
+ (o, n) -> n));
+
+ // adds internal implicit columns to the schema
+
+ schema.put(MetastoreAnalyzeConstants.SCHEMA_FIELD, String.class);
+ schema.put(MetastoreAnalyzeConstants.LOCATION_FIELD, String.class);
+ schema.put(columnNamesOptions.rowGroupIndex(), String.class);
+ schema.put(columnNamesOptions.rowGroupStart(), String.class);
+ schema.put(columnNamesOptions.rowGroupLength(), String.class);
+ schema.put(columnNamesOptions.lastModifiedTime(), String.class);
+
+ return populateRecords(interestingColumns, schema, scan, columnNamesOptions);
+ }
+
+ /**
+ * Populates records list with row group metadata.
+ */
+ private DirectGroupScan populateRecords(Collection<SchemaPath> interestingColumns, Map<String, Class<?>> schema,
+ DrillScanRel scan, ColumnNamesOptions columnNamesOptions) throws IOException {
+ ParquetGroupScan parquetGroupScan = (ParquetGroupScan) scan.getGroupScan();
+ DrillTable drillTable = Utilities.getDrillTable(scan.getTable());
+
+ Multimap<Path, RowGroupMetadata> rowGroupsMetadataMap = parquetGroupScan.getMetadataProvider().getRowGroupsMetadataMap();
+
+ Table<String, Integer, Object> recordsTable = HashBasedTable.create();
+ FormatSelection selection = (FormatSelection) drillTable.getSelection();
+ List<String> partitionColumnNames = ColumnExplorer.getPartitionColumnNames(selection.getSelection(), columnNamesOptions);
+
+ FileSystem rawFs = selection.getSelection().getSelectionRoot().getFileSystem(new Configuration());
+ DrillFileSystem fileSystem =
+ ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), rawFs.getConf());
+
+ int rowIndex = 0;
+ for (Map.Entry<Path, RowGroupMetadata> rgEntry : rowGroupsMetadataMap.entries()) {
+ Path path = rgEntry.getKey();
+ RowGroupMetadata rowGroupMetadata = rgEntry.getValue();
+ List<String> partitionValues = ColumnExplorer.listPartitionValues(path, selection.getSelection().getSelectionRoot(), false);
+ for (int i = 0; i < partitionValues.size(); i++) {
+ String partitionColumnName = partitionColumnNames.get(i);
+ recordsTable.put(partitionColumnName, rowIndex, partitionValues.get(i));
+ }
+
+ recordsTable.put(MetastoreAnalyzeConstants.LOCATION_FIELD, rowIndex, ImplicitFileColumns.FQN.getValue(path));
+ recordsTable.put(columnNamesOptions.rowGroupIndex(), rowIndex, String.valueOf(rowGroupMetadata.getRowGroupIndex()));
+
+ if (interestingColumns == null) {
+ interestingColumns = rowGroupMetadata.getColumnsStatistics().keySet();
+ }
+
+ // populates record list with row group column metadata
+ for (SchemaPath schemaPath : interestingColumns) {
+ ColumnStatistics columnStatistics = rowGroupMetadata.getColumnsStatistics().get(schemaPath);
+ if (IsPredicate.isNullOrEmpty(columnStatistics)) {
+ logger.debug("Statistics for {} column wasn't found within {} row group.", schemaPath, path);
+ return null;
+ }
+ for (StatisticsKind statisticsKind : AnalyzeColumnUtils.COLUMN_STATISTICS_FUNCTIONS.keySet()) {
+ Object statsValue;
+ if (statisticsKind.getName().equalsIgnoreCase(TableStatisticsKind.ROW_COUNT.getName())) {
+ statsValue = TableStatisticsKind.ROW_COUNT.getValue(rowGroupMetadata);
+ } else if (statisticsKind.getName().equalsIgnoreCase(ColumnStatisticsKind.NON_NULL_COUNT.getName())) {
+ statsValue = TableStatisticsKind.ROW_COUNT.getValue(rowGroupMetadata) - ColumnStatisticsKind.NULLS_COUNT.getFrom(columnStatistics);
+ } else {
+ statsValue = columnStatistics.get(statisticsKind);
+ }
+ String columnStatisticsFieldName = AnalyzeColumnUtils.getColumnStatisticsFieldName(schemaPath.getRootSegmentPath(), statisticsKind);
+ if (statsValue != null) {
+ schema.putIfAbsent(
+ columnStatisticsFieldName,
+ statsValue.getClass());
+ recordsTable.put(columnStatisticsFieldName, rowIndex, statsValue);
+ }
+ }
+ }
+
+ // populates record list with row group metadata
+ for (StatisticsKind<?> statisticsKind : AnalyzeColumnUtils.META_STATISTICS_FUNCTIONS.keySet()) {
+ String metadataStatisticsFieldName = AnalyzeColumnUtils.getMetadataStatisticsFieldName(statisticsKind);
+ Object statisticsValue = rowGroupMetadata.getStatistic(statisticsKind);
+
+ if (statisticsValue != null) {
+ schema.putIfAbsent(metadataStatisticsFieldName, statisticsValue.getClass());
+ recordsTable.put(metadataStatisticsFieldName, rowIndex, statisticsValue);
+ }
+ }
+
+ // populates record list internal columns
+ recordsTable.put(MetastoreAnalyzeConstants.SCHEMA_FIELD, rowIndex, rowGroupMetadata.getSchema().jsonString());
+ recordsTable.put(columnNamesOptions.rowGroupStart(), rowIndex, Long.toString(rowGroupMetadata.getStatistic(() -> ExactStatisticsConstants.START)));
+ recordsTable.put(columnNamesOptions.rowGroupLength(), rowIndex, Long.toString(rowGroupMetadata.getStatistic(() -> ExactStatisticsConstants.LENGTH)));
+ recordsTable.put(columnNamesOptions.lastModifiedTime(), rowIndex, String.valueOf(fileSystem.getFileStatus(path).getModificationTime()));
+
+ rowIndex++;
+ }
+
+ // DynamicPojoRecordReader requires LinkedHashMap with fields order
+ // which corresponds to the value position in record list.
+ LinkedHashMap<String, Class<?>> orderedSchema = recordsTable.rowKeySet().stream()
+ .collect(Collectors.toMap(
+ Function.identity(),
+ column -> schema.getOrDefault(column, Integer.class),
+ (o, n) -> n,
+ LinkedHashMap::new));
+
+ IntFunction<List<Object>> collectRecord = currentIndex -> orderedSchema.keySet().stream()
+ .map(column -> recordsTable.get(column, currentIndex))
+ .collect(Collectors.toList());
+
+ List<List<Object>> records = IntStream.range(0, rowIndex)
+ .mapToObj(collectRecord)
+ .collect(Collectors.toList());
+
+ DynamicPojoRecordReader<?> reader = new DynamicPojoRecordReader<>(orderedSchema, records);
+
+ ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, records.size(), 1, schema.size());
+
+ return new DirectGroupScan(reader, scanStats);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
index 5683c77..6673c63 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
@@ -21,11 +21,11 @@ import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.RelTraitDef;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
public class DrillDistributionTrait implements RelTrait {
- public static enum DistributionType {SINGLETON, HASH_DISTRIBUTED, RANGE_DISTRIBUTED, RANDOM_DISTRIBUTED,
- ROUND_ROBIN_DISTRIBUTED, BROADCAST_DISTRIBUTED, ANY};
public static DrillDistributionTrait SINGLETON = new DrillDistributionTrait(DistributionType.SINGLETON);
public static DrillDistributionTrait RANDOM_DISTRIBUTED = new DrillDistributionTrait(DistributionType.RANDOM_DISTRIBUTED);
@@ -33,24 +33,24 @@ public class DrillDistributionTrait implements RelTrait {
public static DrillDistributionTrait DEFAULT = ANY;
- private DistributionType type;
- private final ImmutableList<DistributionField> fields;
+ private final DistributionType type;
+ private final List<DistributionField> fields;
private PartitionFunction partitionFunction = null;
public DrillDistributionTrait(DistributionType type) {
assert (type == DistributionType.SINGLETON || type == DistributionType.RANDOM_DISTRIBUTED || type == DistributionType.ANY
|| type == DistributionType.ROUND_ROBIN_DISTRIBUTED || type == DistributionType.BROADCAST_DISTRIBUTED);
this.type = type;
- this.fields = ImmutableList.<DistributionField>of();
+ this.fields = Collections.emptyList();
}
- public DrillDistributionTrait(DistributionType type, ImmutableList<DistributionField> fields) {
+ public DrillDistributionTrait(DistributionType type, List<DistributionField> fields) {
assert (type == DistributionType.HASH_DISTRIBUTED || type == DistributionType.RANGE_DISTRIBUTED);
this.type = type;
this.fields = fields;
}
- public DrillDistributionTrait(DistributionType type, ImmutableList<DistributionField> fields,
+ public DrillDistributionTrait(DistributionType type, List<DistributionField> fields,
PartitionFunction partitionFunction) {
assert (type == DistributionType.HASH_DISTRIBUTED || type == DistributionType.RANGE_DISTRIBUTED);
this.type = type;
@@ -105,7 +105,7 @@ public class DrillDistributionTrait implements RelTrait {
return this.type;
}
- public ImmutableList<DistributionField> getFields() {
+ public List<DistributionField> getFields() {
return fields;
}
@@ -114,12 +114,7 @@ public class DrillDistributionTrait implements RelTrait {
}
private boolean arePartitionFunctionsSame(PartitionFunction f1, PartitionFunction f2) {
- if (f1 != null && f2 != null) {
- return f1.equals(f2);
- } else if (f2 == null && f2 == null) {
- return true;
- }
- return false;
+ return Objects.equals(f1, f2);
}
@Override
@@ -145,6 +140,15 @@ public class DrillDistributionTrait implements RelTrait {
return fields == null ? this.type.toString() : this.type.toString() + "(" + fields + ")";
}
+ public enum DistributionType {
+ SINGLETON,
+ HASH_DISTRIBUTED,
+ RANGE_DISTRIBUTED,
+ RANDOM_DISTRIBUTED,
+ ROUND_ROBIN_DISTRIBUTED,
+ BROADCAST_DISTRIBUTED,
+ ANY
+ }
public static class DistributionField {
/**
@@ -180,4 +184,51 @@ public class DrillDistributionTrait implements RelTrait {
}
}
+ /**
+ * Stores distribution field index and field name to be used in exchange operators.
+ * Field name is required for the case of dynamic schema discovering
+ * when field is not present in rel data type at planning time.
+ */
+ public static class NamedDistributionField extends DistributionField {
+ /**
+ * Name of the field being DISTRIBUTED.
+ */
+ private final String fieldName;
+
+ public NamedDistributionField(int fieldId, String fieldName) {
+ super(fieldId);
+ this.fieldName = fieldName;
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+
+ NamedDistributionField that = (NamedDistributionField) o;
+
+ return Objects.equals(fieldName, that.fieldName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), fieldName);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s(%s)", fieldName, getFieldId());
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
index 7300a80..fdc03ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.planner.physical;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
@@ -59,8 +58,7 @@ public class HashAggPrule extends AggPruleBase {
final DrillAggregateRel aggregate = call.rel(0);
final RelNode input = call.rel(1);
- if (aggregate.containsDistinctCall() || aggregate.getGroupCount() == 0
- || requiresStreamingAgg(aggregate)) {
+ if (aggregate.containsDistinctCall() || aggregate.getGroupCount() == 0) {
// currently, don't use HashAggregate if any of the logical aggrs contains DISTINCT or
// if there are no grouping keys
return;
@@ -103,16 +101,6 @@ public class HashAggPrule extends AggPruleBase {
}
}
- private boolean requiresStreamingAgg(DrillAggregateRel aggregate) {
- //If contains ANY_VALUE aggregate, using HashAgg would not work
- for (AggregateCall agg : aggregate.getAggCallList()) {
- if (agg.getAggregation().getName().equalsIgnoreCase("any_value")) {
- return true;
- }
- }
- return false;
- }
-
private class TwoPhaseSubset extends SubsetTransformer<DrillAggregateRel, InvalidRelException> {
final RelTrait distOnAllKeys;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashPrelUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashPrelUtil.java
index 494eb82..a7e8cc0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashPrelUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashPrelUtil.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.planner.physical;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait.NamedDistributionField;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.type.RelDataType;
@@ -52,13 +53,8 @@ public class HashPrelUtil {
/**
* Implementation of {@link HashExpressionCreatorHelper} for {@link LogicalExpression} type.
*/
- public static HashExpressionCreatorHelper<LogicalExpression> HASH_HELPER_LOGICALEXPRESSION =
- new HashExpressionCreatorHelper<LogicalExpression>() {
- @Override
- public LogicalExpression createCall(String funcName, List<LogicalExpression> inputFiled) {
- return new FunctionCall(funcName, inputFiled, ExpressionPosition.UNKNOWN);
- }
- };
+ public static HashExpressionCreatorHelper<LogicalExpression> HASH_HELPER_LOGICAL_EXPRESSION =
+ (funcName, inputFiled) -> new FunctionCall(funcName, inputFiled, ExpressionPosition.UNKNOWN);
public static class RexNodeBasedHashExpressionCreatorHelper implements HashExpressionCreatorHelper<RexNode> {
private final RexBuilder rexBuilder;
@@ -161,7 +157,7 @@ public class HashPrelUtil {
* @return hash expression
*/
public static LogicalExpression getHash64Expression(LogicalExpression field, LogicalExpression seed, boolean hashAsDouble) {
- return createHash64Expression(ImmutableList.of(field), seed, HASH_HELPER_LOGICALEXPRESSION, hashAsDouble);
+ return createHash64Expression(ImmutableList.of(field), seed, HASH_HELPER_LOGICAL_EXPRESSION, hashAsDouble);
}
/**
@@ -173,33 +169,38 @@ public class HashPrelUtil {
* @return hash expression
*/
public static LogicalExpression getHashExpression(LogicalExpression field, LogicalExpression seed, boolean hashAsDouble) {
- return createHashExpression(ImmutableList.of(field), seed, HASH_HELPER_LOGICALEXPRESSION, hashAsDouble);
+ return createHashExpression(ImmutableList.of(field), seed, HASH_HELPER_LOGICAL_EXPRESSION, hashAsDouble);
}
/**
* Create a distribution hash expression.
*
- * @param fields Distribution fields
+ * @param fields Distribution fields
* @param rowType Row type
- * @return
+ * @return distribution hash expression
*/
public static LogicalExpression getHashExpression(List<DistributionField> fields, RelDataType rowType) {
assert fields.size() > 0;
- final List<String> childFields = rowType.getFieldNames();
+ List<String> childFields = rowType.getFieldNames();
// If we already included a field with hash - no need to calculate hash further down
- if ( childFields.contains(HASH_EXPR_NAME)) {
+ if (childFields.contains(HASH_EXPR_NAME)) {
return new FieldReference(HASH_EXPR_NAME);
}
- final List<LogicalExpression> expressions = new ArrayList<LogicalExpression>(childFields.size());
- for(int i =0; i < fields.size(); i++){
- expressions.add(new FieldReference(childFields.get(fields.get(i).getFieldId()), ExpressionPosition.UNKNOWN));
+ List<LogicalExpression> expressions = new ArrayList<>();
+ for (DistributionField field : fields) {
+ if (field instanceof NamedDistributionField) {
+ NamedDistributionField namedDistributionField = (NamedDistributionField) field;
+ expressions.add(new FieldReference(namedDistributionField.getFieldName(), ExpressionPosition.UNKNOWN));
+ } else {
+ expressions.add(new FieldReference(childFields.get(field.getFieldId()), ExpressionPosition.UNKNOWN));
+ }
}
- final LogicalExpression distSeed = ValueExpressions.getInt(DIST_SEED);
- return createHashBasedPartitionExpression(expressions, distSeed, HASH_HELPER_LOGICALEXPRESSION);
+ LogicalExpression distSeed = ValueExpressions.getInt(DIST_SEED);
+ return createHashBasedPartitionExpression(expressions, distSeed, HASH_HELPER_LOGICAL_EXPRESSION);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrule.java
index 2ce6e46..9a9e11c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrule.java
@@ -20,15 +20,26 @@ package org.apache.drill.exec.planner.physical;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationImpl;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.MetadataAggRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait.NamedDistributionField;
+import org.apache.drill.exec.store.parquet.FilterEvaluatorUtils.FieldReferenceFinder;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
public class MetadataAggPrule extends Prule {
public static final MetadataAggPrule INSTANCE = new MetadataAggPrule();
@@ -40,21 +51,178 @@ public class MetadataAggPrule extends Prule {
@Override
public void onMatch(RelOptRuleCall call) {
- MetadataAggRel relNode = call.rel(0);
- RelNode input = relNode.getInput();
-
- int groupByExprsSize = relNode.getContext().groupByExpressions().size();
-
- // group by expressions will be returned first
- RelCollation collation = RelCollations.of(IntStream.range(1, groupByExprsSize)
- .mapToObj(RelFieldCollation::new)
- .collect(Collectors.toList()));
-
- // TODO: update DrillDistributionTrait when implemented parallelization for metadata collecting (see DRILL-7433)
- RelTraitSet traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON);
- traits = groupByExprsSize > 0 ? traits.plus(collation) : traits;
- RelNode convertedInput = convert(input, traits);
- call.transformTo(
- new MetadataAggPrel(relNode.getCluster(), traits, convertedInput, relNode.getContext()));
+ MetadataAggRel aggregate = call.rel(0);
+ RelNode input = aggregate.getInput();
+
+ int groupByExprsSize = aggregate.getContext().groupByExpressions().size();
+
+ List<RelFieldCollation> collations = new ArrayList<>();
+ List<String> names = new ArrayList<>();
+ for (int i = 0; i < groupByExprsSize; i++) {
+ collations.add(new RelFieldCollation(i + 1));
+ SchemaPath fieldPath = getArgumentReference(aggregate.getContext().groupByExpressions().get(i));
+ names.add(fieldPath.getRootSegmentPath());
+ }
+
+ RelCollation collation = new NamedRelCollation(collations, names);
+
+ RelTraitSet traits;
+
+ if (aggregate.getContext().groupByExpressions().isEmpty()) {
+ DrillDistributionTrait singleDist = DrillDistributionTrait.SINGLETON;
+ RelTraitSet singleDistTrait = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist);
+
+ createTransformRequest(call, aggregate, input, singleDistTrait);
+ } else {
+ // hash distribute on all grouping keys
+ DrillDistributionTrait distOnAllKeys =
+ new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+ ImmutableList.copyOf(getDistributionFields(aggregate.getContext().groupByExpressions())));
+
+ PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+ boolean smallInput =
+ input.estimateRowCount(input.getCluster().getMetadataQuery()) < settings.getSliceTarget();
+
+ // force 2-phase aggregation for bottom aggregate call
+ // to produce sort locally before aggregation is produced for large inputs
+ if (aggregate.getContext().createNewAggregations() && !smallInput) {
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL);
+ RelNode convertedInput = convert(input, traits);
+
+ new TwoPhaseMetadataAggSubsetTransformer(call, collation, distOnAllKeys)
+ .go(aggregate, convertedInput);
+ } else {
+ // TODO: DRILL-7433 - replace DrillDistributionTrait.SINGLETON with distOnAllKeys when parallelization for MetadataHandler is implemented
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(DrillDistributionTrait.SINGLETON);
+ createTransformRequest(call, aggregate, input, traits);
+ }
+ }
+ }
+
+ private void createTransformRequest(RelOptRuleCall call, MetadataAggRel aggregate,
+ RelNode input, RelTraitSet traits) {
+
+ RelNode convertedInput = convert(input, PrelUtil.fixTraits(call, traits));
+
+ MetadataStreamAggPrel newAgg = new MetadataStreamAggPrel(
+ aggregate.getCluster(),
+ traits,
+ convertedInput,
+ aggregate.getContext(),
+ OperatorPhase.PHASE_1of1);
+
+ call.transformTo(newAgg);
+ }
+
+ /**
+ * Returns list with named distribution fields which correspond to specified expressions arguments.
+ *
+ * @param namedExpressions expressions list
+ * @return list of {@link NamedDistributionField} instances
+ */
+ private static List<NamedDistributionField> getDistributionFields(List<NamedExpression> namedExpressions) {
+ List<NamedDistributionField> distributionFields = new ArrayList<>();
+ int groupByExprsSize = namedExpressions.size();
+
+ for (int index = 0; index < groupByExprsSize; index++) {
+ SchemaPath fieldPath = getArgumentReference(namedExpressions.get(index));
+ NamedDistributionField field =
+ new NamedDistributionField(index + 1, fieldPath.getRootSegmentPath());
+ distributionFields.add(field);
+ }
+
+ return distributionFields;
+ }
+
+ /**
+ * Returns {@link FieldReference} instance which corresponds to the argument of specified {@code namedExpression}.
+ *
+ * @param namedExpression expression
+ * @return {@link FieldReference} instance
+ */
+ private static FieldReference getArgumentReference(NamedExpression namedExpression) {
+ Set<SchemaPath> arguments = namedExpression.getExpr().accept(FieldReferenceFinder.INSTANCE, null);
+ assert arguments.size() == 1 : "Group by expression contains more than one argument";
+ return new FieldReference(arguments.iterator().next());
+ }
+
+ /**
+ * Implementation of {@link RelCollationImpl} with field name.
+ * Stores {@link RelFieldCollation} list and corresponding field names to be used in sort operators.
+ * Field name is required for the case of dynamic schema discovering
+ * when field is not present in rel data type at planning time.
+ */
+ public static class NamedRelCollation extends RelCollationImpl {
+ private final List<String> names;
+
+ protected NamedRelCollation(List<RelFieldCollation> fieldCollations, List<String> names) {
+ super(com.google.common.collect.ImmutableList.copyOf(fieldCollations));
+ this.names = Collections.unmodifiableList(names);
+ }
+
+ public String getName(int collationIndex) {
+ return names.get(collationIndex - 1);
+ }
+ }
+
+ /**
+ * {@link SubsetTransformer} for creating two-phase metadata aggregation.
+ */
+ private static class TwoPhaseMetadataAggSubsetTransformer
+ extends SubsetTransformer<MetadataAggRel, RuntimeException> {
+
+ private final RelCollation collation;
+ private final DrillDistributionTrait distributionTrait;
+
+ public TwoPhaseMetadataAggSubsetTransformer(RelOptRuleCall call,
+ RelCollation collation, DrillDistributionTrait distributionTrait) {
+ super(call);
+ this.collation = collation;
+ this.distributionTrait = distributionTrait;
+ }
+
+ @Override
+ public RelNode convertChild(MetadataAggRel aggregate, RelNode child) {
+ DrillDistributionTrait toDist = child.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+ RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, RelCollations.EMPTY, toDist);
+ RelNode newInput = convert(child, traits);
+
+ // maps group by expressions to themselves to be able to produce the second aggregation
+ List<NamedExpression> identityExpressions = aggregate.getContext().groupByExpressions().stream()
+ .map(namedExpression -> new NamedExpression(namedExpression.getExpr(), getArgumentReference(namedExpression)))
+ .collect(Collectors.toList());
+
+ // use hash aggregation for the first stage to avoid sorting raw data
+ MetadataHashAggPrel phase1Agg = new MetadataHashAggPrel(
+ aggregate.getCluster(),
+ traits,
+ newInput,
+ aggregate.getContext().toBuilder().groupByExpressions(identityExpressions).build(),
+ OperatorPhase.PHASE_1of2);
+
+ traits = newTraitSet(Prel.DRILL_PHYSICAL, collation, toDist).plus(distributionTrait);
+ SortPrel sort = new SortPrel(
+ aggregate.getCluster(),
+ traits,
+ phase1Agg,
+ (RelCollation) traits.getTrait(collation.getTraitDef()));
+
+ int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
+
+ HashToMergeExchangePrel exch =
+ new HashToMergeExchangePrel(phase1Agg.getCluster(),
+ traits,
+ sort,
+ ImmutableList.copyOf(getDistributionFields(aggregate.getContext().groupByExpressions())),
+ collation,
+ numEndPoints);
+
+ return new MetadataStreamAggPrel(
+ aggregate.getCluster(),
+ newTraitSet(Prel.DRILL_PHYSICAL, collation, DrillDistributionTrait.SINGLETON),
+ exch,
+ aggregate.getContext(),
+ OperatorPhase.PHASE_2of2);
+ }
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataHandlerPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataHandlerPrule.java
index 8424469..bd8beab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataHandlerPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataHandlerPrule.java
@@ -36,7 +36,7 @@ public class MetadataHandlerPrule extends Prule {
public void onMatch(RelOptRuleCall call) {
MetadataHandlerRel relNode = call.rel(0);
RelNode input = relNode.getInput();
- RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.DEFAULT);
+ RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON);
RelNode convertedInput = convert(input, traits);
call.transformTo(new MetadataHandlerPrel(relNode.getCluster(), traits,
convertedInput,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataHashAggPrel.java
similarity index 78%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrel.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataHashAggPrel.java
index a50f1a8..8f454f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataHashAggPrel.java
@@ -21,11 +21,11 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
+import org.apache.drill.exec.metastore.analyze.MetadataAggregateContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.MetadataAggPOP;
+import org.apache.drill.exec.physical.config.MetadataHashAggPOP;
import org.apache.drill.exec.planner.common.DrillRelNode;
import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
-import org.apache.drill.exec.metastore.analyze.MetadataAggregateContext;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@@ -33,19 +33,21 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.List;
-public class MetadataAggPrel extends SingleRel implements DrillRelNode, Prel {
+public class MetadataHashAggPrel extends SingleRel implements DrillRelNode, Prel {
private final MetadataAggregateContext context;
+ private final AggPrelBase.OperatorPhase phase;
- public MetadataAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode input,
- MetadataAggregateContext context) {
+ public MetadataHashAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+ MetadataAggregateContext context, AggPrelBase.OperatorPhase phase) {
super(cluster, traits, input);
this.context = context;
+ this.phase = phase;
}
@Override
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
Prel child = (Prel) this.getInput();
- MetadataAggPOP physicalOperator = new MetadataAggPOP(child.getPhysicalOperator(creator), context);
+ MetadataHashAggPOP physicalOperator = new MetadataHashAggPOP(child.getPhysicalOperator(creator), context, phase);
return creator.addMetadata(this, physicalOperator);
}
@@ -66,7 +68,7 @@ public class MetadataAggPrel extends SingleRel implements DrillRelNode, Prel {
@Override
public boolean needsFinalColumnReordering() {
- return true;
+ return false;
}
@Override
@@ -77,6 +79,10 @@ public class MetadataAggPrel extends SingleRel implements DrillRelNode, Prel {
@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
Preconditions.checkState(inputs.size() == 1);
- return new MetadataAggPrel(getCluster(), traitSet, inputs.iterator().next(), context);
+ return new MetadataHashAggPrel(getCluster(), traitSet, inputs.iterator().next(), context, phase);
+ }
+
+ public AggPrelBase.OperatorPhase getPhase() {
+ return phase;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataStreamAggPrel.java
similarity index 77%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrel.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataStreamAggPrel.java
index a50f1a8..5da8917 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataStreamAggPrel.java
@@ -22,8 +22,9 @@ import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.MetadataAggPOP;
+import org.apache.drill.exec.physical.config.MetadataStreamAggPOP;
import org.apache.drill.exec.planner.common.DrillRelNode;
+import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
import org.apache.drill.exec.metastore.analyze.MetadataAggregateContext;
import org.apache.drill.exec.record.BatchSchema;
@@ -33,19 +34,21 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.List;
-public class MetadataAggPrel extends SingleRel implements DrillRelNode, Prel {
+public class MetadataStreamAggPrel extends SingleRel implements DrillRelNode, Prel {
private final MetadataAggregateContext context;
+ private final OperatorPhase phase;
- public MetadataAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode input,
- MetadataAggregateContext context) {
+ public MetadataStreamAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+ MetadataAggregateContext context, OperatorPhase phase) {
super(cluster, traits, input);
this.context = context;
+ this.phase = phase;
}
@Override
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
Prel child = (Prel) this.getInput();
- MetadataAggPOP physicalOperator = new MetadataAggPOP(child.getPhysicalOperator(creator), context);
+ MetadataStreamAggPOP physicalOperator = new MetadataStreamAggPOP(child.getPhysicalOperator(creator), context, phase);
return creator.addMetadata(this, physicalOperator);
}
@@ -66,7 +69,7 @@ public class MetadataAggPrel extends SingleRel implements DrillRelNode, Prel {
@Override
public boolean needsFinalColumnReordering() {
- return true;
+ return false;
}
@Override
@@ -77,6 +80,10 @@ public class MetadataAggPrel extends SingleRel implements DrillRelNode, Prel {
@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
Preconditions.checkState(inputs.size() == 1);
- return new MetadataAggPrel(getCluster(), traitSet, inputs.iterator().next(), context);
+ return new MetadataStreamAggPrel(getCluster(), traitSet, inputs.iterator().next(), context, phase);
+ }
+
+ public OperatorPhase getPhase() {
+ return phase;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
index e6cba6b..4639bdd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.planner.physical;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRuleCall;
@@ -37,22 +35,33 @@ import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.function.Function;
public class PrelUtil {
public static List<Ordering> getOrdering(RelCollation collation, RelDataType rowType) {
- List<Ordering> orderExpr = Lists.newArrayList();
-
- final List<String> childFields = rowType.getFieldNames();
-
- for (RelFieldCollation fc : collation.getFieldCollations()) {
- FieldReference fr = new FieldReference(childFields.get(fc.getFieldIndex()), ExpressionPosition.UNKNOWN);
- orderExpr.add(new Ordering(fc.getDirection(), fr, fc.nullDirection));
+ List<Ordering> orderExpr = new ArrayList<>();
+
+ List<String> childFields = rowType.getFieldNames();
+ Function<RelFieldCollation, String> fieldNameProvider;
+ if (collation instanceof MetadataAggPrule.NamedRelCollation) {
+ fieldNameProvider = fieldCollation -> {
+ MetadataAggPrule.NamedRelCollation namedCollation = (MetadataAggPrule.NamedRelCollation) collation;
+ return namedCollation.getName(fieldCollation.getFieldIndex());
+ };
+ } else {
+ fieldNameProvider = fieldCollation -> childFields.get(fieldCollation.getFieldIndex());
}
+ collation.getFieldCollations().forEach(fieldCollation -> {
+ FieldReference fieldReference = new FieldReference(fieldNameProvider.apply(fieldCollation), ExpressionPosition.UNKNOWN);
+ orderExpr.add(new Ordering(fieldCollation.getDirection(), fieldReference, fieldCollation.nullDirection));
+ });
+
return orderExpr;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreAnalyzeTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreAnalyzeTableHandler.java
index c34ee99..36eae41 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreAnalyzeTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreAnalyzeTableHandler.java
@@ -37,6 +37,7 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.util.function.CheckedSupplier;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
import org.apache.drill.exec.metastore.analyze.AnalyzeInfoProvider;
import org.apache.drill.exec.metastore.analyze.MetadataAggregateContext;
import org.apache.drill.exec.metastore.analyze.MetadataControllerContext;
@@ -116,12 +117,14 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
.build(logger);
}
+ ColumnNamesOptions columnNamesOptions = new ColumnNamesOptions(context.getOptions());
+
SqlIdentifier tableIdentifier = sqlAnalyzeTable.getTableIdentifier();
// creates select with DYNAMIC_STAR column and analyze specific columns to obtain corresponding table scan
SqlSelect scanSql = new SqlSelect(
SqlParserPos.ZERO,
SqlNodeList.EMPTY,
- getColumnList(sqlAnalyzeTable, analyzeInfoProvider),
+ getColumnList(analyzeInfoProvider.getProjectionFields(table, getMetadataType(sqlAnalyzeTable), columnNamesOptions)),
tableIdentifier,
null,
null,
@@ -175,13 +178,12 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
/**
* Generates the column list with {@link SchemaPath#DYNAMIC_STAR} and columns required for analyze.
*/
- private SqlNodeList getColumnList(SqlMetastoreAnalyzeTable sqlAnalyzeTable, AnalyzeInfoProvider analyzeInfoProvider) {
+ private SqlNodeList getColumnList(List<SchemaPath> projectingColumns) {
SqlNodeList columnList = new SqlNodeList(SqlParserPos.ZERO);
columnList.add(new SqlIdentifier(SchemaPath.DYNAMIC_STAR, SqlParserPos.ZERO));
- MetadataType metadataLevel = getMetadataType(sqlAnalyzeTable);
- for (SqlIdentifier field : analyzeInfoProvider.getProjectionFields(metadataLevel, context.getPlannerSettings().getOptions())) {
- columnList.add(field);
- }
+ projectingColumns.stream()
+ .map(segmentColumn -> new SqlIdentifier(segmentColumn.getRootSegmentPath(), SqlParserPos.ZERO))
+ .forEach(columnList::add);
return columnList;
}
@@ -219,7 +221,9 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
.workspace(workspaceName)
.build();
- List<String> segmentColumns = analyzeInfoProvider.getSegmentColumns(table, context.getPlannerSettings().getOptions()).stream()
+ ColumnNamesOptions columnNamesOptions = new ColumnNamesOptions(context.getOptions());
+
+ List<String> segmentColumns = analyzeInfoProvider.getSegmentColumns(table, columnNamesOptions).stream()
.map(SchemaPath::getRootSegmentPath)
.collect(Collectors.toList());
List<NamedExpression> segmentExpressions = segmentColumns.stream()
@@ -298,7 +302,7 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
.forEach(statisticsColumns::add);
}
- SchemaPath locationField = analyzeInfoProvider.getLocationField(config.getContext().getOptions());
+ SchemaPath locationField = analyzeInfoProvider.getLocationField(columnNamesOptions);
if (analyzeInfoProvider.supportsMetadataType(MetadataType.ROW_GROUP) && metadataLevel.includes(MetadataType.ROW_GROUP)) {
MetadataHandlerContext handlerContext = MetadataHandlerContext.builder()
@@ -344,7 +348,7 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
.build();
convertedRelNode = getSegmentAggRelNode(segmentExpressions, convertedRelNode,
- createNewAggregations, statisticsColumns, locationField, analyzeInfoProvider, i, handlerContext);
+ createNewAggregations, statisticsColumns, locationField, i, handlerContext);
locationField = SchemaPath.getSimplePath(MetastoreAnalyzeConstants.LOCATION_FIELD);
@@ -409,6 +413,7 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
.interestingColumns(statisticsColumns)
.createNewAggregations(createNewAggregations)
.excludedColumns(excludedColumns)
+ .metadataLevel(MetadataType.TABLE)
.build();
convertedRelNode = new MetadataAggRel(convertedRelNode.getCluster(),
@@ -424,22 +429,20 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
private DrillRel getSegmentAggRelNode(List<NamedExpression> segmentExpressions, DrillRel convertedRelNode,
boolean createNewAggregations, List<SchemaPath> statisticsColumns, SchemaPath locationField,
- AnalyzeInfoProvider analyzeInfoProvider, int segmentLevel, MetadataHandlerContext handlerContext) {
- SchemaPath lastModifiedTimeField =
+ int segmentLevel, MetadataHandlerContext handlerContext) {
+ SchemaPath lastModifiedTimeField =
SchemaPath.getSimplePath(config.getContext().getOptions().getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL));
List<SchemaPath> excludedColumns = Arrays.asList(lastModifiedTimeField, locationField);
- List<NamedExpression> groupByExpressions = new ArrayList<>();
- groupByExpressions.add(analyzeInfoProvider.getParentLocationExpression(locationField));
-
- groupByExpressions.addAll(segmentExpressions);
+ List<NamedExpression> groupByExpressions = new ArrayList<>(segmentExpressions);
MetadataAggregateContext aggregateContext = MetadataAggregateContext.builder()
- .groupByExpressions(groupByExpressions.subList(0, segmentLevel + 1))
+ .groupByExpressions(groupByExpressions.subList(0, segmentLevel))
.interestingColumns(statisticsColumns)
.createNewAggregations(createNewAggregations)
.excludedColumns(excludedColumns)
+ .metadataLevel(MetadataType.SEGMENT)
.build();
convertedRelNode = new MetadataAggRel(convertedRelNode.getCluster(),
@@ -470,6 +473,7 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
.interestingColumns(statisticsColumns)
.createNewAggregations(createNewAggregations)
.excludedColumns(excludedColumns)
+ .metadataLevel(MetadataType.FILE)
.build();
convertedRelNode = new MetadataAggRel(convertedRelNode.getCluster(),
@@ -508,6 +512,7 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
.interestingColumns(statisticsColumns)
.createNewAggregations(createNewAggregations)
.excludedColumns(excludedColumns)
+ .metadataLevel(MetadataType.ROW_GROUP)
.build();
convertedRelNode = new MetadataAggRel(convertedRelNode.getCluster(),
@@ -525,11 +530,10 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
SchemaPath locationField, String rowGroupIndexColumn, SchemaPath rgiField) {
List<NamedExpression> rowGroupGroupByExpressions = new ArrayList<>(segmentExpressions);
rowGroupGroupByExpressions.add(
+ new NamedExpression(locationField, FieldReference.getWithQuotedRef(MetastoreAnalyzeConstants.LOCATION_FIELD)));
+ rowGroupGroupByExpressions.add(
new NamedExpression(rgiField,
FieldReference.getWithQuotedRef(rowGroupIndexColumn)));
-
- rowGroupGroupByExpressions.add(
- new NamedExpression(locationField, FieldReference.getWithQuotedRef(MetastoreAnalyzeConstants.LOCATION_FIELD)));
return rowGroupGroupByExpressions;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
index 2b2dd74..d4c45ae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
@@ -31,6 +31,7 @@ import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.map.CaseInsensitiveMap;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.store.dfs.FileSelection;
@@ -156,7 +157,7 @@ public class ColumnExplorer {
* @param path column path
* @return true if given column is partition, false otherwise
*/
- public static boolean isPartitionColumn(String partitionDesignator, String path){
+ public static boolean isPartitionColumn(String partitionDesignator, String path) {
Pattern pattern = Pattern.compile(String.format("%s[0-9]+", partitionDesignator));
Matcher matcher = pattern.matcher(path);
return matcher.matches();
@@ -164,7 +165,18 @@ public class ColumnExplorer {
public boolean isImplicitColumn(String name) {
return isPartitionColumn(partitionDesignator, name) ||
- isImplicitFileColumn(name);
+ isImplicitOrInternalFileColumn(name);
+ }
+
+ /**
+ * Checks whether given column is implicit or internal.
+ *
+ * @param name name of the column to check
+ * @return {@code true} if given column is implicit or internal, {@code false} otherwise
+ */
+ public boolean isImplicitOrInternalFileColumn(String name) {
+ return allImplicitColumns.get(name) != null
+ || allInternalColumns.get(name) != null;
}
public boolean isImplicitFileColumn(String name) {
@@ -191,14 +203,13 @@ public class ColumnExplorer {
* Returns list with partition column names.
* For the case when table has several levels of nesting, max level is chosen.
*
- * @param selection the source of file paths
- * @param optionManager the source of session option value for partition column label
+ * @param selection the source of file paths
+ * @param columnNamesOptions the source of session option value for partition column label
* @return list with partition column names.
*/
- public static List<String> getPartitionColumnNames(FileSelection selection, OptionManager optionManager) {
+ public static List<String> getPartitionColumnNames(FileSelection selection, ColumnNamesOptions columnNamesOptions) {
- String partitionColumnLabel = optionManager.getString(
- ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
+ String partitionColumnLabel = columnNamesOptions.partitionColumnNameLabel();
return getPartitionColumnNames(selection, partitionColumnLabel);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index 50112ab..a9bff98 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -31,6 +32,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
import org.apache.drill.common.expression.ExpressionStringBuilder;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
@@ -72,7 +74,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
-public abstract class AbstractParquetGroupScan extends AbstractGroupScanWithMetadata {
+public abstract class AbstractParquetGroupScan extends AbstractGroupScanWithMetadata<ParquetMetadataProvider> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetGroupScan.class);
@@ -140,6 +142,9 @@ public abstract class AbstractParquetGroupScan extends AbstractGroupScanWithMeta
*/
@Override
public List<EndpointAffinity> getOperatorAffinity() {
+ if (endpointAffinities == null) {
+ this.endpointAffinities = AffinityCreator.getAffinityMap(getRowGroupInfos());
+ }
return endpointAffinities;
}
@@ -369,11 +374,11 @@ public abstract class AbstractParquetGroupScan extends AbstractGroupScanWithMeta
return getFilterer()
.rowGroups(prunedRowGroups)
- .table(getTableMetadata())
- .partitions(getPartitionsMetadata())
- .segments(getSegmentsMetadata())
+ .table(tableMetadata)
+ .partitions(partitions)
+ .segments(segments)
.files(qualifiedFiles)
- .nonInterestingColumns(getNonInterestingColumnsMetadata())
+ .nonInterestingColumns(nonInterestingColumnsMetadata)
.matching(matchAllMetadata)
.build();
}
@@ -433,17 +438,9 @@ public abstract class AbstractParquetGroupScan extends AbstractGroupScanWithMeta
}
// protected methods block
- @Override
- protected void init() throws IOException {
- super.init();
-
- this.partitionColumns = metadataProvider.getPartitionColumns();
- this.endpointAffinities = AffinityCreator.getAffinityMap(getRowGroupInfos());
- }
-
protected Multimap<Path, RowGroupMetadata> getRowGroupsMetadata() {
if (rowGroups == null) {
- rowGroups = ((ParquetMetadataProvider) metadataProvider).getRowGroupsMetadataMap();
+ rowGroups = metadataProvider.getRowGroupsMetadataMap();
}
return rowGroups;
}
@@ -534,8 +531,6 @@ public abstract class AbstractParquetGroupScan extends AbstractGroupScanWithMeta
newScan.fileSet = new HashSet<>(newScan.getRowGroupsMetadata().keySet());
}
- newScan.endpointAffinities = AffinityCreator.getAffinityMap(newScan.getRowGroupInfos());
-
return newScan;
}
@@ -593,6 +588,16 @@ public abstract class AbstractParquetGroupScan extends AbstractGroupScanWithMeta
this.rowGroups = LinkedListMultimap.create();
filteredRowGroups.forEach(entry -> this.rowGroups.put(entry.getPath(), entry));
+ // updates files list to include only present row groups
+ if (MapUtils.isNotEmpty(files)) {
+ files = rowGroups.keySet().stream()
+ .map(files::get)
+ .collect(Collectors.toMap(
+ FileMetadata::getPath,
+ Function.identity(),
+ (o, n) -> n,
+ LinkedHashMap::new));
+ }
} else {
this.rowGroups = prunedRowGroups;
matchAllMetadata = false;
@@ -601,6 +606,67 @@ public abstract class AbstractParquetGroupScan extends AbstractGroupScanWithMeta
}
/**
+ * Produces filtering of metadata at file level.
+ *
+ * @param optionManager option manager
+ * @param filterPredicate filter expression
+ * @param schemaPathsInExpr columns used in filter expression
+ */
+ @Override
+ protected void filterFileMetadata(OptionManager optionManager,
+ FilterPredicate<?> filterPredicate,
+ Set<SchemaPath> schemaPathsInExpr) {
+ Map<Path, FileMetadata> prunedFiles;
+ if (!source.getPartitionsMetadata().isEmpty()
+ && source.getPartitionsMetadata().size() > getPartitions().size()) {
+ // prunes files to leave only files which are contained by pruned partitions
+ prunedFiles = pruneForPartitions(source.getFilesMetadata(), getPartitions());
+ } else if (!source.getSegmentsMetadata().isEmpty()
+ && source.getSegmentsMetadata().size() > getSegments().size()) {
+ // prunes row groups to leave only row groups which are contained by pruned segments
+ prunedFiles = pruneForSegments(source.getFilesMetadata(), getSegments());
+ } else {
+ prunedFiles = source.getFilesMetadata();
+ }
+
+ if (isMatchAllMetadata()) {
+ files = prunedFiles;
+ return;
+ }
+
+ // files which have only single row group may be pruned when pruning row groups
+ Map<Path, FileMetadata> omittedFiles = new HashMap<>();
+
+ AbstractParquetGroupScan abstractParquetGroupScan = (AbstractParquetGroupScan) source;
+
+ Map<Path, FileMetadata> filesToFilter = new HashMap<>(prunedFiles);
+ if (!abstractParquetGroupScan.rowGroups.isEmpty()) {
+ prunedFiles.forEach((path, fileMetadata) -> {
+ if (abstractParquetGroupScan.rowGroups.get(path).size() == 1) {
+ omittedFiles.put(path, fileMetadata);
+ filesToFilter.remove(path);
+ }
+ });
+ }
+
+ // Stop files pruning for the case:
+ // - # of files is beyond PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD.
+ if (filesToFilter.size() <= optionManager.getOption(
+ PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD)) {
+
+ matchAllMetadata = true;
+ files = filterAndGetMetadata(schemaPathsInExpr, filesToFilter.values(), filterPredicate, optionManager).stream()
+ .collect(Collectors.toMap(FileMetadata::getPath, Function.identity()));
+
+ files.putAll(omittedFiles);
+ } else {
+ matchAllMetadata = false;
+ files = prunedFiles;
+ overflowLevel = MetadataType.FILE;
+ }
+ }
+
+ /**
* Removes metadata which does not belong to any of segments in metadata list.
*
* @param metadataToPrune list of metadata which should be pruned
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileTableMetadataProviderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileTableMetadataProviderBuilder.java
index 4df69d1..cc93873 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileTableMetadataProviderBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileTableMetadataProviderBuilder.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.parquet;
import org.apache.drill.exec.metastore.ParquetTableMetadataProvider;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.metastore.metadata.TableMetadataProviderBuilder;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
@@ -32,6 +33,9 @@ import java.util.List;
*/
public interface ParquetFileTableMetadataProviderBuilder extends TableMetadataProviderBuilder {
+ @Override
+ ParquetFileTableMetadataProviderBuilder withSchema(TupleMetadata schema);
+
ParquetFileTableMetadataProviderBuilder withEntries(List<ReadEntryWithPath> entries);
ParquetFileTableMetadataProviderBuilder withSelectionRoot(Path selectionRoot);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
index 6765a20..a9ee538 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
@@ -29,7 +29,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.util.StdConverter;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -93,7 +92,7 @@ public class DynamicPojoRecordReader<T> extends AbstractPojoRecordReader<List<T>
* An utility class that converts from {@link com.fasterxml.jackson.databind.JsonNode}
* to DynamicPojoRecordReader during physical plan fragment deserialization.
*/
- public static class Converter extends StdConverter<JsonNode, DynamicPojoRecordReader> {
+ public static class Converter<T> extends StdConverter<JsonNode, DynamicPojoRecordReader<T>> {
private static final TypeReference<LinkedHashMap<String, Class<?>>> schemaType =
new TypeReference<LinkedHashMap<String, Class<?>>>() {};
@@ -105,16 +104,22 @@ public class DynamicPojoRecordReader<T> extends AbstractPojoRecordReader<List<T>
}
@Override
- public DynamicPojoRecordReader convert(JsonNode value) {
- LinkedHashMap<String, Class<?>> schema = mapper.convertValue(value.get("schema"), schemaType);
-
- ArrayList records = new ArrayList(schema.size());
- final Iterator<JsonNode> recordsIterator = value.get("records").get(0).elements();
- for (Class<?> fieldType : schema.values()) {
- records.add(mapper.convertValue(recordsIterator.next(), fieldType));
+ public DynamicPojoRecordReader<T> convert(JsonNode value) {
+ LinkedHashMap<String, Class<T>> schema = mapper.convertValue(value.get("schema"), schemaType);
+ List<List<T>> records = new ArrayList<>();
+
+ JsonNode serializedRecords = value.get("records");
+ for (JsonNode serializedRecord : serializedRecords) {
+ List<T> record = new ArrayList<>(schema.size());
+ Iterator<JsonNode> recordsIterator = serializedRecord.elements();
+ for (Class<T> fieldType : schema.values()) {
+ record.add(mapper.convertValue(recordsIterator.next(), fieldType));
+ }
+ records.add(record);
}
+
int maxRecordsToRead = value.get("recordsPerBatch").asInt();
- return new DynamicPojoRecordReader(schema, Collections.singletonList(records), maxRecordsToRead);
+ return new DynamicPojoRecordReader(schema, records, maxRecordsToRead);
}
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunction.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunction.java
index 38b0ae5..6b703c7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunction.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunction.java
@@ -85,7 +85,7 @@ public class TestAggregateFunction extends PopUnitTestBase {
public void testCovarianceCorrelation() throws Throwable {
String planPath = "/functions/test_covariance.json";
String dataPath = "/covariance_input.json";
- Double expectedValues[] = {4.571428571428571d, 4.857142857142857d, -6.000000000000002d, 4.0d, 4.25d, -5.250000000000002d, 1.0d, 0.9274260335029677d, -1.0000000000000004d};
+ Double[] expectedValues = {4.571428571428571d, 4.857142857142857d, -6.000000000000002d, 4.0d, 4.25d, -5.250000000000002d, 1.0d, 0.9274260335029677d, -1.0000000000000004d};
runTest(expectedValues, planPath, dataPath);
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
index f79757d..fcfd3fe 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
@@ -1056,66 +1056,172 @@ public class TestAggregateFunctions extends ClusterTest {
}
@Test
- public void testCollectList() throws Exception {
- testBuilder()
- .sqlQuery("select collect_list('n_nationkey', n_nationkey, " +
- "'n_name', n_name, 'n_regionkey', n_regionkey, 'n_comment', n_comment) as l from (select * from cp.`tpch/nation.parquet` limit 2)")
- .unOrdered()
- .baselineColumns("l")
- .baselineValues(listOf(
- mapOf("n_nationkey", 0, "n_name", "ALGERIA",
- "n_regionkey", 0, "n_comment", " haggle. carefully final deposits detect slyly agai"),
- mapOf("n_nationkey", 1, "n_name", "ARGENTINA", "n_regionkey", 1,
- "n_comment", "al foxes promise slyly according to the regular accounts. bold requests alon")))
- .go();
+ public void testCollectListStreamAgg() throws Exception {
+ try {
+ client.alterSession(PlannerSettings.HASHAGG.getOptionName(), false);
+ testBuilder()
+ .sqlQuery("select collect_list('n_nationkey', n_nationkey, " +
+ "'n_name', n_name, 'n_regionkey', n_regionkey, 'n_comment', n_comment) as l " +
+ "from (select * from cp.`tpch/nation.parquet` limit 2)")
+ .unOrdered()
+ .baselineColumns("l")
+ .baselineValues(listOf(
+ mapOf("n_nationkey", 0, "n_name", "ALGERIA",
+ "n_regionkey", 0, "n_comment", " haggle. carefully final deposits detect slyly agai"),
+ mapOf("n_nationkey", 1, "n_name", "ARGENTINA", "n_regionkey", 1,
+ "n_comment", "al foxes promise slyly according to the regular accounts. bold requests alon")))
+ .go();
+ } finally {
+ client.resetSession(PlannerSettings.HASHAGG.getOptionName());
+ }
}
@Test
- public void testCollectToListVarchar() throws Exception {
- testBuilder()
- .sqlQuery("select collect_to_list_varchar(`date`) as l from " +
- "(select * from cp.`store/json/clicks.json` limit 2)")
- .unOrdered()
- .baselineColumns("l")
- .baselineValues(listOf("2014-04-26", "2014-04-20"))
- .go();
+ public void testCollectListHashAgg() throws Exception {
+ try {
+ client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), false);
+ testBuilder()
+ .sqlQuery("select collect_list('n_nationkey', n_nationkey, " +
+ "'n_name', n_name, 'n_regionkey', n_regionkey, 'n_comment', n_comment) as l " +
+ "from (select * from cp.`tpch/nation.parquet` limit 2) group by 'a'")
+ .unOrdered()
+ .baselineColumns("l")
+ .baselineValues(listOf(
+ mapOf("n_nationkey", 0, "n_name", "ALGERIA",
+ "n_regionkey", 0, "n_comment", " haggle. carefully final deposits detect slyly agai"),
+ mapOf("n_nationkey", 1, "n_name", "ARGENTINA", "n_regionkey", 1,
+ "n_comment", "al foxes promise slyly according to the regular accounts. bold requests alon")))
+ .go();
+ } finally {
+ client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
+ }
}
@Test
- public void testSchemaFunction() throws Exception {
- TupleMetadata schema = new SchemaBuilder()
- .add("n_nationkey", TypeProtos.MinorType.INT)
- .add("n_name", TypeProtos.MinorType.VARCHAR)
- .add("n_regionkey", TypeProtos.MinorType.INT)
- .add("n_comment", TypeProtos.MinorType.VARCHAR)
- .build();
+ public void testCollectToListVarcharStreamAgg() throws Exception {
+ try {
+ client.alterSession(PlannerSettings.HASHAGG.getOptionName(), false);
+ testBuilder()
+ .sqlQuery("select collect_to_list_varchar(`date`) as l from " +
+ "(select * from cp.`store/json/clicks.json` limit 2)")
+ .unOrdered()
+ .baselineColumns("l")
+ .baselineValues(listOf("2014-04-26", "2014-04-20"))
+ .go();
+ } finally {
+ client.resetSession(PlannerSettings.HASHAGG.getOptionName());
+ }
+ }
- testBuilder()
- .sqlQuery("select schema('n_nationkey', n_nationkey, " +
- "'n_name', n_name, 'n_regionkey', n_regionkey, 'n_comment', n_comment) as l from " +
- "(select * from cp.`tpch/nation.parquet` limit 2)")
- .unOrdered()
- .baselineColumns("l")
- .baselineValues(schema.jsonString())
- .go();
+ @Test
+ public void testCollectToListVarcharHashAgg() throws Exception {
+ try {
+ client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), false);
+ testBuilder()
+ .sqlQuery("select collect_to_list_varchar(`date`) as l from " +
+ "(select * from cp.`store/json/clicks.json` limit 2) group by 'a'")
+ .unOrdered()
+ .baselineColumns("l")
+ .baselineValues(listOf("2014-04-26", "2014-04-20"))
+ .go();
+ } finally {
+ client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
+ }
}
@Test
- public void testMergeSchemaFunction() throws Exception {
- String schema = new SchemaBuilder()
- .add("n_nationkey", TypeProtos.MinorType.INT)
- .add("n_name", TypeProtos.MinorType.VARCHAR)
- .add("n_regionkey", TypeProtos.MinorType.INT)
- .add("n_comment", TypeProtos.MinorType.VARCHAR)
- .build()
- .jsonString();
+ public void testSchemaFunctionStreamAgg() throws Exception {
+ try {
+ client.alterSession(PlannerSettings.HASHAGG.getOptionName(), false);
+ TupleMetadata schema = new SchemaBuilder()
+ .add("n_nationkey", TypeProtos.MinorType.INT)
+ .add("n_name", TypeProtos.MinorType.VARCHAR)
+ .add("n_regionkey", TypeProtos.MinorType.INT)
+ .add("n_comment", TypeProtos.MinorType.VARCHAR)
+ .build();
- testBuilder()
- .sqlQuery("select merge_schema('%s') as l from " +
- "(select * from cp.`tpch/nation.parquet` limit 2)", schema)
- .unOrdered()
- .baselineColumns("l")
- .baselineValues(schema)
- .go();
+ testBuilder()
+ .sqlQuery("select schema('n_nationkey', n_nationkey, " +
+ "'n_name', n_name, 'n_regionkey', n_regionkey, 'n_comment', n_comment) as l from " +
+ "(select * from cp.`tpch/nation.parquet` limit 2)")
+ .unOrdered()
+ .baselineColumns("l")
+ .baselineValues(schema.jsonString())
+ .go();
+ } finally {
+ client.resetSession(PlannerSettings.HASHAGG.getOptionName());
+ }
+ }
+
+ @Test
+ public void testSchemaFunctionHashAgg() throws Exception {
+ try {
+ client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), false);
+ TupleMetadata schema = new SchemaBuilder()
+ .add("n_nationkey", TypeProtos.MinorType.INT)
+ .add("n_name", TypeProtos.MinorType.VARCHAR)
+ .add("n_regionkey", TypeProtos.MinorType.INT)
+ .add("n_comment", TypeProtos.MinorType.VARCHAR)
+ .build();
+
+ testBuilder()
+ .sqlQuery("select schema('n_nationkey', n_nationkey, " +
+ "'n_name', n_name, 'n_regionkey', n_regionkey, 'n_comment', n_comment) as l from " +
+ "(select * from cp.`tpch/nation.parquet` limit 2) group by 'a'")
+ .unOrdered()
+ .baselineColumns("l")
+ .baselineValues(schema.jsonString())
+ .go();
+ } finally {
+ client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
+ }
+ }
+
+ @Test
+ public void testMergeSchemaFunctionStreamAgg() throws Exception {
+ try {
+ client.alterSession(PlannerSettings.HASHAGG.getOptionName(), false);
+ String schema = new SchemaBuilder()
+ .add("n_nationkey", TypeProtos.MinorType.INT)
+ .add("n_name", TypeProtos.MinorType.VARCHAR)
+ .add("n_regionkey", TypeProtos.MinorType.INT)
+ .add("n_comment", TypeProtos.MinorType.VARCHAR)
+ .build()
+ .jsonString();
+
+ testBuilder()
+ .sqlQuery("select merge_schema('%s') as l from " +
+ "(select * from cp.`tpch/nation.parquet` limit 2)", schema)
+ .unOrdered()
+ .baselineColumns("l")
+ .baselineValues(schema)
+ .go();
+ } finally {
+ client.resetSession(PlannerSettings.HASHAGG.getOptionName());
+ }
+ }
+
+ @Test
+ public void testMergeSchemaFunctionHashAgg() throws Exception {
+ try {
+ client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), false);
+ String schema = new SchemaBuilder()
+ .add("n_nationkey", TypeProtos.MinorType.INT)
+ .add("n_name", TypeProtos.MinorType.VARCHAR)
+ .add("n_regionkey", TypeProtos.MinorType.INT)
+ .add("n_comment", TypeProtos.MinorType.VARCHAR)
+ .build()
+ .jsonString();
+
+ testBuilder()
+ .sqlQuery("select merge_schema('%s') as l from " +
+ "(select * from cp.`tpch/nation.parquet` limit 2) group by 'a'", schema)
+ .unOrdered()
+ .baselineColumns("l")
+ .baselineValues(schema)
+ .go();
+ } finally {
+ client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
+ }
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java
index 850b49d..a1cd2e1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java
@@ -18,22 +18,29 @@
package org.apache.drill.exec.physical.impl.agg;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
import org.apache.drill.test.PhysicalOpUnitTestBase;
import org.apache.drill.exec.util.JsonStringArrayList;
-import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.categories.OperatorTest;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.apache.drill.test.TestBuilder;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import java.math.BigDecimal;
+import java.util.Arrays;
import java.util.List;
+import static org.apache.drill.test.TestBuilder.listOf;
+import static org.apache.drill.test.TestBuilder.mapOf;
+
@Category(OperatorTest.class)
@RunWith(Enclosed.class)
public class TestAggWithAnyValue {
@@ -43,7 +50,7 @@ public class TestAggWithAnyValue {
@Test
public void testStreamAggWithGroupBy() {
StreamingAggregate aggConf = new StreamingAggregate(null, parseExprs("age.`max`", "age"), parseExprs("any_value(a)", "any_a"));
- List<String> inputJsonBatches = Lists.newArrayList(
+ List<String> inputJsonBatches = Arrays.asList(
"[{ \"age\": {\"min\":20, \"max\":60}, \"city\": \"San Bruno\", \"de\": \"987654321987654321987654321.10987654321\"," +
" \"a\": [{\"b\":50, \"c\":30},{\"b\":70, \"c\":40}], \"m\": [{\"n\": [10, 11, 12]}], \"f\": [{\"g\": {\"h\": [{\"k\": 70}, {\"k\": 80}]}}]," +
"\"p\": {\"q\": [21, 22, 23]}" + "}, " +
@@ -63,87 +70,258 @@ public class TestAggWithAnyValue {
.physicalOperator(aggConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("age", "any_a")
- .baselineValues(60l, TestBuilder.listOf(TestBuilder.mapOf("b", 50l, "c", 30l), TestBuilder.mapOf("b", 70l, "c", 40l)))
- .baselineValues(80l, TestBuilder.listOf(TestBuilder.mapOf("b", 10l, "c", 15l), TestBuilder.mapOf("b", 20l, "c", 45l)))
+ .baselineValues(60L,
+ listOf(
+ mapOf("b", 50L, "c", 30L),
+ mapOf("b", 70L, "c", 40L)))
+ .baselineValues(80L,
+ listOf(
+ mapOf("b", 10L, "c", 15L),
+ mapOf("b", 20L, "c", 45L)))
+ .go();
+ }
+
+ @Test
+ public void testHashAggWithGroupBy() {
+ HashAggregate aggConf = new HashAggregate(null,
+ OperatorPhase.PHASE_1of1,
+ parseExprs("age.`max`", "age"),
+ parseExprs("any_value(a)", "any_a"),
+ 1F);
+ List<String> inputJsonBatches = Arrays.asList(
+ "[{ \"age\": {\"min\":20, \"max\":60}, \"city\": \"San Bruno\", \"de\": \"987654321987654321987654321.10987654321\"," +
+ " \"a\": [{\"b\":50, \"c\":30},{\"b\":70, \"c\":40}], \"m\": [{\"n\": [10, 11, 12]}], \"f\": [{\"g\": {\"h\": [{\"k\": 70}, {\"k\": 80}]}}]," +
+ "\"p\": {\"q\": [21, 22, 23]}}, " +
+ "{ \"age\": {\"min\":20, \"max\":60}, \"city\": \"Castro Valley\", \"de\": \"987654321987654321987654321.12987654321\"," +
+ " \"a\": [{\"b\":60, \"c\":40},{\"b\":80, \"c\":50}], \"m\": [{\"n\": [13, 14, 15]}], \"f\": [{\"g\": {\"h\": [{\"k\": 90}, {\"k\": 100}]}}]," +
+ "\"p\": {\"q\": [24, 25, 26]}}]",
+ "[{ \"age\": {\"min\":43, \"max\":80}, \"city\": \"Palo Alto\", \"de\": \"987654321987654321987654321.00987654321\"," +
+ " \"a\": [{\"b\":10, \"c\":15}, {\"b\":20, \"c\":45}], \"m\": [{\"n\": [1, 2, 3]}], \"f\": [{\"g\": {\"h\": [{\"k\": 10}, {\"k\": 20}]}}]," +
+ "\"p\": {\"q\": [27, 28, 29]}}, " +
+ "{ \"age\": {\"min\":43, \"max\":80}, \"city\": \"San Carlos\", \"de\": \"987654321987654321987654321.11987654321\"," +
+ " \"a\": [{\"b\":30, \"c\":25}, {\"b\":40, \"c\":55}], \"m\": [{\"n\": [4, 5, 6]}], \"f\": [{\"g\": {\"h\": [{\"k\": 30}, {\"k\": 40}]}}]," +
+ "\"p\": {\"q\": [30, 31, 32]}}, " +
+ "{ \"age\": {\"min\":43, \"max\":80}, \"city\": \"Palo Alto\", \"de\": \"987654321987654321987654321.13987654321\"," +
+ " \"a\": [{\"b\":70, \"c\":85}, {\"b\":90, \"c\":145}], \"m\": [{\"n\": [7, 8, 9]}], \"f\": [{\"g\": {\"h\": [{\"k\": 50}, {\"k\": 60}]}}]," +
+ "\"p\": {\"q\": [33, 34, 35]}}]");
+ legacyOpTestBuilder()
+ .physicalOperator(aggConf)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("age", "any_a")
+ .baselineValues(60L,
+ listOf(
+ mapOf("b", 50L, "c", 30L),
+ mapOf("b", 70L, "c", 40L)))
+ .baselineValues(80L,
+ listOf(
+ mapOf("b", 10L, "c", 15L),
+ mapOf("b", 20L, "c", 45L)))
.go();
}
}
- public static class TestAggWithAnyValueSingleBatch extends BaseTestQuery {
+ public static class TestAggWithAnyValueSingleBatch extends ClusterTest {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ startCluster(ClusterFixture.builder(dirTestWatcher));
+ }
@Test
- public void testWithGroupBy() throws Exception {
- String query = "select t1.age.`max` as age, count(*) as cnt, any_value(t1.a) as any_a, any_value(t1.city) as any_city, " +
- "any_value(f) as any_f, any_value(m) as any_m, any_value(p) as any_p from cp.`store/json/test_anyvalue.json` t1 group by t1.age.`max`";
- testBuilder()
- .sqlQuery(query)
- .unOrdered()
- .baselineColumns("age", "cnt", "any_a", "any_city", "any_f", "any_m", "any_p")
- .baselineValues(60l, 2l, TestBuilder.listOf(TestBuilder.mapOf("b", 50l, "c", 30l), TestBuilder.mapOf("b", 70l, "c", 40l)), "San Bruno",
- TestBuilder.listOf(TestBuilder.mapOf("g", TestBuilder.mapOf("h", TestBuilder.listOf(TestBuilder.mapOf("k", 70l), TestBuilder.mapOf("k", 80l))))),
- TestBuilder.listOf(TestBuilder.mapOf("n", TestBuilder.listOf(10l, 11l, 12l))),
- TestBuilder.mapOf("q", TestBuilder.listOf(21l, 22l, 23l)))
- .baselineValues(80l, 3l, TestBuilder.listOf(TestBuilder.mapOf("b", 10l, "c", 15l), TestBuilder.mapOf("b", 20l, "c", 45l)), "Palo Alto",
- TestBuilder.listOf(TestBuilder.mapOf("g", TestBuilder.mapOf("h", TestBuilder.listOf(TestBuilder.mapOf("k", 10l), TestBuilder.mapOf("k", 20l))))),
- TestBuilder.listOf(TestBuilder.mapOf("n", TestBuilder.listOf(1l, 2l, 3l))),
- TestBuilder.mapOf("q", TestBuilder.listOf(27l, 28l, 29l)))
- .go();
+ public void testWithGroupByStreamAgg() throws Exception {
+ String query = "select t1.age.`max` as age, count(*) as cnt, any_value(t1.a) as any_a," +
+ "any_value(t1.city) as any_city, any_value(f) as any_f, any_value(m) as any_m," +
+ "any_value(p) as any_p from cp.`store/json/test_anyvalue.json` t1 group by t1.age.`max`";
+
+ try {
+ client.alterSession(PlannerSettings.HASHAGG.getOptionName(), false);
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("age", "cnt", "any_a", "any_city", "any_f", "any_m", "any_p")
+ .baselineValues(60L, 2L,
+ listOf(
+ mapOf("b", 50L, "c", 30L),
+ mapOf("b", 70L, "c", 40L)),
+ "San Bruno",
+ listOf(
+ mapOf("g",
+ mapOf("h",
+ listOf(mapOf("k", 70L), mapOf("k", 80L))))),
+ listOf(mapOf("n", listOf(10L, 11L, 12L))),
+ mapOf("q", listOf(21L, 22L, 23L)))
+ .baselineValues(80L, 3L,
+ listOf(
+ mapOf("b", 10L, "c", 15L),
+ mapOf("b", 20L, "c", 45L)),
+ "Palo Alto",
+ listOf(mapOf("g",
+ mapOf("h", listOf(mapOf("k", 10L), mapOf("k", 20L))))),
+ listOf(mapOf("n", listOf(1L, 2L, 3L))),
+ mapOf("q", listOf(27L, 28L, 29L)))
+ .go();
+ } finally {
+ client.resetSession(PlannerSettings.HASHAGG.getOptionName());
+ }
+ }
+
+ @Test
+ public void testWithGroupByHashAgg() throws Exception {
+ String query = "select t1.age.`max` as age, count(*) as cnt, any_value(t1.a) as any_a," +
+ "any_value(t1.city) as any_city, any_value(f) as any_f, any_value(m) as any_m," +
+ "any_value(p) as any_p from cp.`store/json/test_anyvalue.json` t1 group by t1.age.`max`";
+ try {
+ client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), false);
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("age", "cnt", "any_a", "any_city", "any_f", "any_m", "any_p")
+ .baselineValues(60L, 2L,
+ listOf(
+ mapOf("b", 50L, "c", 30L),
+ mapOf("b", 70L, "c", 40L)),
+ "San Bruno",
+ listOf(
+ mapOf("g",
+ mapOf("h",
+ listOf(mapOf("k", 70L), mapOf("k", 80L))))),
+ listOf(mapOf("n", listOf(10L, 11L, 12L))),
+ mapOf("q", listOf(21L, 22L, 23L)))
+ .baselineValues(80L, 3L,
+ listOf(
+ mapOf("b", 10L, "c", 15L),
+ mapOf("b", 20L, "c", 45L)),
+ "Palo Alto",
+ listOf(mapOf("g",
+ mapOf("h", listOf(mapOf("k", 10L), mapOf("k", 20L))))),
+ listOf(mapOf("n", listOf(1L, 2L, 3L))),
+ mapOf("q", listOf(27L, 28L, 29L)))
+ .go();
+ } finally {
+ client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
+ }
}
@Test
public void testWithoutGroupBy() throws Exception {
String query = "select count(*) as cnt, any_value(t1.a) as any_a, any_value(t1.city) as any_city, " +
- "any_value(f) as any_f, any_value(m) as any_m, any_value(p) as any_p from cp.`store/json/test_anyvalue.json` t1";
+ "any_value(f) as any_f, any_value(m) as any_m, any_value(p) as any_p " +
+ "from cp.`store/json/test_anyvalue.json` t1";
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("cnt", "any_a", "any_city", "any_f", "any_m", "any_p")
- .baselineValues(5l, TestBuilder.listOf(TestBuilder.mapOf("b", 10l, "c", 15l), TestBuilder.mapOf("b", 20l, "c", 45l)), "Palo Alto",
- TestBuilder.listOf(TestBuilder.mapOf("g", TestBuilder.mapOf("h", TestBuilder.listOf(TestBuilder.mapOf("k", 10l), TestBuilder.mapOf("k", 20l))))),
- TestBuilder.listOf(TestBuilder.mapOf("n", TestBuilder.listOf(1l, 2l, 3l))),
- TestBuilder.mapOf("q", TestBuilder.listOf(27l, 28l, 29l)))
+ .baselineValues(5L,
+ listOf(
+ mapOf("b", 10L, "c", 15L),
+ mapOf("b", 20L, "c", 45L)),
+ "Palo Alto",
+ listOf(mapOf("g", mapOf("h", listOf(mapOf("k", 10L), mapOf("k", 20L))))),
+ listOf(mapOf("n", listOf(1L, 2L, 3L))),
+ mapOf("q", listOf(27L, 28L, 29L)))
.go();
}
@Test
- public void testDecimalWithGroupBy() throws Exception {
- String query = "select t1.age.`max` as age, any_value(cast(t1.de as decimal(38, 11))) as any_decimal " +
- "from cp.`store/json/test_anyvalue.json` t1 group by t1.age.`max`";
- testBuilder()
- .sqlQuery(query)
- .unOrdered()
- .baselineColumns("age", "any_decimal")
- .baselineValues(60l, new BigDecimal("987654321987654321987654321.10987654321"))
- .baselineValues(80l, new BigDecimal("987654321987654321987654321.00987654321"))
- .go();
+ public void testDecimalWithGroupByStreamAgg() throws Exception {
+ try {
+ client.alterSession(PlannerSettings.HASHAGG.getOptionName(), false);
+ String query = "select t1.age.`max` as age, any_value(cast(t1.de as decimal(38, 11))) as any_decimal " +
+ "from cp.`store/json/test_anyvalue.json` t1 group by t1.age.`max`";
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("age", "any_decimal")
+ .baselineValues(60L, new BigDecimal("987654321987654321987654321.10987654321"))
+ .baselineValues(80L, new BigDecimal("987654321987654321987654321.00987654321"))
+ .go();
+ } finally {
+ client.resetSession(PlannerSettings.HASHAGG.getOptionName());
+ }
}
@Test
- public void testRepeatedDecimalWithGroupBy() throws Exception {
- JsonStringArrayList<BigDecimal> ints = new JsonStringArrayList<>();
- ints.add(new BigDecimal("999999.999"));
- ints.add(new BigDecimal("-999999.999"));
- ints.add(new BigDecimal("0.000"));
-
- JsonStringArrayList<BigDecimal> longs = new JsonStringArrayList<>();
- longs.add(new BigDecimal("999999999.999999999"));
- longs.add(new BigDecimal("-999999999.999999999"));
- longs.add(new BigDecimal("0.000000000"));
-
- JsonStringArrayList<BigDecimal> fixedLen = new JsonStringArrayList<>();
- fixedLen.add(new BigDecimal("999999999999.999999"));
- fixedLen.add(new BigDecimal("-999999999999.999999"));
- fixedLen.add(new BigDecimal("0.000000"));
-
- String query = "select any_value(decimal_int32) as any_dec_32, any_value(decimal_int64) as any_dec_64," +
- " any_value(decimal_fixedLen) as any_dec_fixed, any_value(decimal_binary) as any_dec_bin" +
- " from cp.`parquet/repeatedIntLondFixedLenBinaryDecimal.parquet`";
- testBuilder()
- .sqlQuery(query)
- .unOrdered()
- .baselineColumns("any_dec_32", "any_dec_64", "any_dec_fixed", "any_dec_bin")
- .baselineValues(ints, longs, fixedLen, fixedLen)
- .go();
+ public void testDecimalWithGroupByHashAgg() throws Exception {
+ try {
+ client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), false);
+ String query = "select t1.age.`max` as age, any_value(cast(t1.de as decimal(38, 11))) as any_decimal " +
+ "from cp.`store/json/test_anyvalue.json` t1 group by t1.age.`max`";
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("age", "any_decimal")
+ .baselineValues(60L, new BigDecimal("987654321987654321987654321.10987654321"))
+ .baselineValues(80L, new BigDecimal("987654321987654321987654321.00987654321"))
+ .go();
+ } finally {
+ client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
+ }
+ }
+
+ @Test
+ public void testRepeatedDecimalWithGroupByStreamAgg() throws Exception {
+ try {
+ client.alterSession(PlannerSettings.HASHAGG.getOptionName(), false);
+ JsonStringArrayList<BigDecimal> ints = new JsonStringArrayList<>();
+ ints.add(new BigDecimal("999999.999"));
+ ints.add(new BigDecimal("-999999.999"));
+ ints.add(new BigDecimal("0.000"));
+
+ JsonStringArrayList<BigDecimal> longs = new JsonStringArrayList<>();
+ longs.add(new BigDecimal("999999999.999999999"));
+ longs.add(new BigDecimal("-999999999.999999999"));
+ longs.add(new BigDecimal("0.000000000"));
+
+ JsonStringArrayList<BigDecimal> fixedLen = new JsonStringArrayList<>();
+ fixedLen.add(new BigDecimal("999999999999.999999"));
+ fixedLen.add(new BigDecimal("-999999999999.999999"));
+ fixedLen.add(new BigDecimal("0.000000"));
+
+ String query = "select any_value(decimal_int32) as any_dec_32, any_value(decimal_int64) as any_dec_64," +
+ " any_value(decimal_fixedLen) as any_dec_fixed, any_value(decimal_binary) as any_dec_bin" +
+ " from cp.`parquet/repeatedIntLondFixedLenBinaryDecimal.parquet` group by 'a'";
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("any_dec_32", "any_dec_64", "any_dec_fixed", "any_dec_bin")
+ .baselineValues(ints, longs, fixedLen, fixedLen)
+ .go();
+ } finally {
+ client.resetSession(PlannerSettings.HASHAGG.getOptionName());
+ }
+ }
+
+ @Test
+ public void testRepeatedDecimalWithGroupByHashAgg() throws Exception {
+ try {
+ client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), false);
+ JsonStringArrayList<BigDecimal> ints = new JsonStringArrayList<>();
+ ints.add(new BigDecimal("999999.999"));
+ ints.add(new BigDecimal("-999999.999"));
+ ints.add(new BigDecimal("0.000"));
+
+ JsonStringArrayList<BigDecimal> longs = new JsonStringArrayList<>();
+ longs.add(new BigDecimal("999999999.999999999"));
+ longs.add(new BigDecimal("-999999999.999999999"));
+ longs.add(new BigDecimal("0.000000000"));
+
+ JsonStringArrayList<BigDecimal> fixedLen = new JsonStringArrayList<>();
+ fixedLen.add(new BigDecimal("999999999999.999999"));
+ fixedLen.add(new BigDecimal("-999999999999.999999"));
+ fixedLen.add(new BigDecimal("0.000000"));
+
+ String query = "select any_value(decimal_int32) as any_dec_32, any_value(decimal_int64) as any_dec_64," +
+ " any_value(decimal_fixedLen) as any_dec_fixed, any_value(decimal_binary) as any_dec_bin" +
+ " from cp.`parquet/repeatedIntLondFixedLenBinaryDecimal.parquet` group by 'a'";
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("any_dec_32", "any_dec_64", "any_dec_fixed", "any_dec_bin")
+ .baselineValues(ints, longs, fixedLen, fixedLen)
+ .go();
+ } finally {
+ client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
+ }
}
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggEmitOutcome.java
index fa60562..910039f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggEmitOutcome.java
@@ -42,7 +42,8 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
@Category(OperatorTest.class)
public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
@@ -73,11 +74,11 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
* @param outputRowCounts - expected number of rows, in each output batch
* @param outputOutcomes - the expected output outcomes
*/
- private void testHashAggrEmit(int inp2_1[], int inp2_2[], String inp2_3[], // first input batch
- int inp3_1[], int inp3_2[], String inp3_3[], // second input batch
- String exp1_1[], int exp1_2[], // first expected
- String exp2_1[], int exp2_2[], // second expected
- int inpRowSet[], RecordBatch.IterOutcome inpOutcomes[], // input batches + outcomes
+ private void testHashAggrEmit(int[] inp2_1, int[] inp2_2, String[] inp2_3, // first input batch
+ int[] inp3_1, int[] inp3_2, String[] inp3_3, // second input batch
+ String[] exp1_1, int[] exp1_2, // first expected
+ String[] exp2_1, int[] exp2_2, // second expected
+ int[] inpRowSet, RecordBatch.IterOutcome[] inpOutcomes, // input batches + outcomes
List<Integer> outputRowCounts, // output row counts per each out batch
List<RecordBatch.IterOutcome> outputOutcomes) // output outcomes
{
@@ -134,12 +135,11 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
case 3: inputContainer.add(nonEmptyInputRowSet3.container());
break;
default:
- assertTrue(false);
+ fail();
}
}
- for (RecordBatch.IterOutcome out : inpOutcomes) { // build the outcomes
- inputOutcomes.add(out);
- }
+ // build the outcomes
+ inputOutcomes.addAll(Arrays.asList(inpOutcomes));
//
// Build the Hash Agg Batch operator
@@ -158,20 +158,21 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
//
// Iterate thru the next batches, and verify expected outcomes
//
- assertTrue( outputRowCounts.size() == outputOutcomes.size());
+ assertEquals(outputRowCounts.size(), outputOutcomes.size());
boolean firstOne = true;
- for (int ind = 0; ind < outputOutcomes.size(); ind++ ) {
+ for (int ind = 0; ind < outputOutcomes.size(); ind++) {
RecordBatch.IterOutcome expOut = outputOutcomes.get(ind);
- assertTrue(haBatch.next() == expOut );
- if ( expOut == NONE ) { break; } // done
+ assertSame(expOut, haBatch.next());
+ if (expOut == NONE) {
+ break;
+ } // done
RowSet actualRowSet = DirectRowSet.fromContainer(haBatch.getContainer());
int expectedSize = outputRowCounts.get(ind);
// System.out.println(expectedSize);
- if ( 0 == expectedSize ) {
+ if (0 == expectedSize) {
assertEquals(expectedSize, haBatch.getRecordCount());
- }
- else if ( firstOne ) {
+ } else if (firstOne) {
firstOne = false;
new RowSetComparison(expectedRowSet1).verify(actualRowSet);
} else {
@@ -197,8 +198,8 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
*/
@Test
public void testHashAggrWithEmptyDataSet() {
- int inpRowSet[] = {0};
- RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA};
+ int[] inpRowSet = {0};
+ RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA};
List<Integer> outputRowCounts = Arrays.asList(0, 0);
List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, NONE);
@@ -213,8 +214,8 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
*/
@Test
public void testHashAggrEmptyBatchEmitOutcome() {
- int inpRowSet[] = {0, 0};
- RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, EMIT};
+ int[] inpRowSet = {0, 0};
+ RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, EMIT};
List<Integer> outputRowCounts = Arrays.asList(0, 0, 0);
List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, EMIT, NONE);
@@ -230,15 +231,15 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
*/
@Test
public void testHashAggrNonEmptyBatchEmitOutcome() {
- int inp2_1[] = { 2, 2, 13, 13, 4};
- int inp2_2[] = {20, 20, 130, 130, 40};
- String inp2_3[] = {"item2", "item2", "item13", "item13", "item4"};
+ int[] inp2_1 = {2, 2, 13, 13, 4};
+ int[] inp2_2 = {20, 20, 130, 130, 40};
+ String[] inp2_3 = {"item2", "item2", "item13", "item13", "item4"};
- String exp1_1[] = {"item2", "item13", "item4"};
- int exp1_2[] = { 44, 286, 44};
+ String[] exp1_1 = {"item2", "item13", "item4"};
+ int[] exp1_2 = {44, 286, 44};
- int inpRowSet[] = {0, 2};
- RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, EMIT};
+ int[] inpRowSet = {0, 2};
+ RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, EMIT};
List<Integer> outputRowCounts = Arrays.asList(0, 3, 0);
List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, EMIT, NONE);
@@ -252,15 +253,15 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
*/
@Test
public void testHashAggrEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
- int inp2_1[] = {2, 13, 4, 0, 0, 0};
- int inp2_2[] = {20, 130, 40, 2000, 1300, 4000};
- String inp2_3[] = {"item2", "item13", "item4", "item2", "item13", "item4"};
+ int[] inp2_1 = {2, 13, 4, 0, 0, 0};
+ int[] inp2_2 = {20, 130, 40, 2000, 1300, 4000};
+ String[] inp2_3 = {"item2", "item13", "item4", "item2", "item13", "item4"};
- String exp1_1[] = {"item2", "item13", "item4"};
- int exp1_2[] = {2022, 1443, 4044};
+ String[] exp1_1 = {"item2", "item13", "item4"};
+ int[] exp1_2 = {2022, 1443, 4044};
- int inpRowSet[] = {0, 0, 2};
- RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, EMIT, EMIT};
+ int[] inpRowSet = {0, 0, 2};
+ RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, EMIT, EMIT};
List<Integer> outputRowCounts = Arrays.asList(0, 0, 3, 0);
List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, EMIT, EMIT, NONE);
@@ -274,15 +275,15 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
*/
@Test
public void testHashAggrMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
- int inp2_1[] = {2, 13, 4, 0, 1, 0, 1};
- int inp2_2[] = {20, 130, 40, 0, 11000, 0, 33000};
- String inp2_3[] = {"item2", "item13", "item4", "item2", "item2", "item13", "item13"};
+ int[] inp2_1 = {2, 13, 4, 0, 1, 0, 1};
+ int[] inp2_2 = {20, 130, 40, 0, 11000, 0, 33000};
+ String[] inp2_3 = {"item2", "item13", "item4", "item2", "item2", "item13", "item13"};
- String exp1_1[] = {"item2", "item13", "item4"};
- int exp1_2[] = {11023, 33144, 44};
+ String[] exp1_1 = {"item2", "item13", "item4"};
+ int[] exp1_2 = {11023, 33144, 44};
- int inpRowSet[] = {0, 0, 0, 0, 2};
- RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, EMIT, EMIT, EMIT, EMIT};
+ int[] inpRowSet = {0, 0, 0, 0, 2};
+ RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, EMIT, EMIT, EMIT, EMIT};
List<Integer> outputRowCounts = Arrays.asList(0, 0, 0, 0, 3, 0);
List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, EMIT, EMIT, EMIT, EMIT, NONE);
@@ -300,18 +301,18 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
*/
@Test
public void testHashAgrResetsAfterFirstEmitOutcome() {
- int inp2_1[] = {2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2 };
- int inp2_2[] = {20, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 20 };
- String inp2_3[] = {"item2", "item3", "item3", "item3", "item3", "item3", "item3", "item3", "item3", "item3", "item3", "item2"};
+ int[] inp2_1 = {2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2};
+ int[] inp2_2 = {20, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 20};
+ String[] inp2_3 = {"item2", "item3", "item3", "item3", "item3", "item3", "item3", "item3", "item3", "item3", "item3", "item2"};
- String exp1_1[] = {"item1"};
- int exp1_2[] = {11};
+ String[] exp1_1 = {"item1"};
+ int[] exp1_2 = {11};
- String exp2_1[] = {"item2", "item3"};
- int exp2_2[] = {44, 330};
+ String[] exp2_1 = {"item2", "item3"};
+ int[] exp2_2 = {44, 330};
- int inpRowSet[] = {1, 0, 2, 0};
- RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, EMIT, OK, EMIT};
+ int[] inpRowSet = {1, 0, 2, 0};
+ RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, EMIT, OK, EMIT};
List<Integer> outputRowCounts = Arrays.asList(0, 1, 2, 0);
List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, EMIT, EMIT, NONE);
@@ -327,11 +328,11 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
@Test
public void testHashAggr_NonEmptyFirst_EmptyOKEmitOutcome() {
- String exp1_1[] = {"item1"};
- int exp1_2[] = {11};
+ String[] exp1_1 = {"item1"};
+ int[] exp1_2 = {11};
- int inpRowSet[] = {1, 0, 0, 0};
- RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, OK, EMIT, NONE};
+ int[] inpRowSet = {1, 0, 0, 0};
+ RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, OK, EMIT, NONE};
List<Integer> outputRowCounts = Arrays.asList(0, 1, 0);
List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, EMIT, NONE);
@@ -349,18 +350,18 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
*/
@Test
public void testHashAggrMultipleOutputBatch() {
- int inp2_1[] = {4, 2, 5, 3, 5, 4};
- int inp2_2[] = {40, 20, 50, 30, 50, 40};
- String inp2_3[] = {"item4", "item2", "item5", "item3", "item5", "item4"};
+ int[] inp2_1 = {4, 2, 5, 3, 5, 4};
+ int[] inp2_2 = {40, 20, 50, 30, 50, 40};
+ String[] inp2_3 = {"item4", "item2", "item5", "item3", "item5", "item4"};
- String exp1_1[] = {"item1"};
- int exp1_2[] = {11};
+ String[] exp1_1 = {"item1"};
+ int[] exp1_2 = {11};
- String exp2_1[] = {"item4", "item2", "item5", "item3"};
- int exp2_2[] = { 88, 22, 110, 33};
+ String[] exp2_1 = {"item4", "item2", "item5", "item3"};
+ int[] exp2_2 = {88, 22, 110, 33};
- int inpRowSet[] = {1, 0, 2};
- RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, EMIT, OK};
+ int[] inpRowSet = {1, 0, 2};
+ RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, EMIT, OK};
List<Integer> outputRowCounts = Arrays.asList(0, 1, 4, 0);
List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, EMIT, OK, NONE);
@@ -374,18 +375,18 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
*/
@Test
public void testHashAggrMultipleEMITOutcome() {
- int inp2_1[] = {2, 3};
- int inp2_2[] = {20, 30};
- String inp2_3[] = {"item2", "item3"};
+ int[] inp2_1 = {2, 3};
+ int[] inp2_2 = {20, 30};
+ String[] inp2_3 = {"item2", "item3"};
- String exp1_1[] = {"item1"};
- int exp1_2[] = {11};
+ String[] exp1_1 = {"item1"};
+ int[] exp1_2 = {11};
- String exp2_1[] = {"item2", "item3"};
- int exp2_2[] = { 22, 33};
+ String[] exp2_1 = {"item2", "item3"};
+ int[] exp2_2 = {22, 33};
- int inpRowSet[] = {1, 0, 2, 0};
- RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, EMIT, EMIT, EMIT};
+ int[] inpRowSet = {1, 0, 2, 0};
+ RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, EMIT, EMIT, EMIT};
List<Integer> outputRowCounts = Arrays.asList(0, 1, 2, 0, 0);
List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, EMIT, EMIT, EMIT, NONE);
@@ -399,15 +400,15 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
*/
@Test
public void testHashAggrMultipleInputToSingleOutputBatch() {
- int inp2_1[] = {2};
- int inp2_2[] = {20};
- String inp2_3[] = {"item2"};
+ int[] inp2_1 = {2};
+ int[] inp2_2 = {20};
+ String[] inp2_3 = {"item2"};
- String exp1_1[] = {"item1", "item2"};
- int exp1_2[] = { 11, 22};
+ String[] exp1_1 = {"item1", "item2"};
+ int[] exp1_2 = { 11, 22};
- int inpRowSet[] = {1, 0, 2, 0};
- RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, OK, OK, EMIT};
+ int[] inpRowSet = {1, 0, 2, 0};
+ RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, OK, OK, EMIT};
List<Integer> outputRowCounts = Arrays.asList(0, 2, 0);
List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, EMIT, NONE);
@@ -421,22 +422,22 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
*/
@Test
public void testHashAggrMultipleInputToMultipleOutputBatch() {
- int inp2_1[] = {7, 2, 7, 3};
- int inp2_2[] = {70, 20, 70, 33};
- String inp2_3[] = {"item7", "item1", "item7", "item3"};
+ int[] inp2_1 = {7, 2, 7, 3};
+ int[] inp2_2 = {70, 20, 70, 33};
+ String[] inp2_3 = {"item7", "item1", "item7", "item3"};
- int inp3_1[] = {17, 7, 3, 13, 9, 13};
- int inp3_2[] = {170, 71, 30, 130, 123, 130};
- String inp3_3[] = {"item17", "item7", "item3", "item13", "item3", "item13"};
+ int[] inp3_1 = {17, 7, 3, 13, 9, 13};
+ int[] inp3_2 = {170, 71, 30, 130, 123, 130};
+ String[] inp3_3 = {"item17", "item7", "item3", "item13", "item3", "item13"};
- String exp1_1[] = {"item1", "item7", "item3"};
- int exp1_2[] = { 33, 154, 36};
+ String[] exp1_1 = {"item1", "item7", "item3"};
+ int[] exp1_2 = {33, 154, 36};
- String exp2_1[] = {"item17", "item7", "item3", "item13"};
- int exp2_2[] = { 187, 78, 165, 286};
+ String[] exp2_1 = {"item17", "item7", "item3", "item13"};
+ int[] exp2_2 = {187, 78, 165, 286};
- int inpRowSet[] = {1, 0, 2, 0, 3, 0};
- RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, OK, EMIT, OK, OK, EMIT};
+ int[] inpRowSet = {1, 0, 2, 0, 3, 0};
+ RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, OK, EMIT, OK, OK, EMIT};
List<Integer> outputRowCounts = Arrays.asList(0, 3, 4, 0);
List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, EMIT, EMIT, NONE);
@@ -452,19 +453,19 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
*/
@Test
public void testHashAggr_WithEmptyNonEmptyBatchesAndOKOutcome() {
- int inp2_1[] = { 2, 7, 3, 13, 13, 13};
- int inp2_2[] = { 20, 70, 33, 130, 130, 130};
- String inp2_3[] = {"item1", "item7", "item3", "item13", "item13", "item13"};
+ int[] inp2_1 = {2, 7, 3, 13, 13, 13};
+ int[] inp2_2 = {20, 70, 33, 130, 130, 130};
+ String[] inp2_3 = {"item1", "item7", "item3", "item13", "item13", "item13"};
- int inp3_1[] = { 17, 23, 130, 0};
- int inp3_2[] = { 170, 230, 1300, 0};
- String inp3_3[] = {"item7", "item23", "item130", "item130"};
+ int[] inp3_1 = {17, 23, 130, 0};
+ int[] inp3_2 = {170, 230, 1300, 0};
+ String[] inp3_3 = {"item7", "item23", "item130", "item130"};
- String exp1_1[] = {"item1", "item7", "item3", "item13", "item23", "item130"};
- int exp1_2[] = { 33, 264, 36, 429, 253, 1430};
+ String[] exp1_1 = {"item1", "item7", "item3", "item13", "item23", "item130"};
+ int[] exp1_2 = {33, 264, 36, 429, 253, 1430};
- int inpRowSet[] = {1, 0, 2, 0, 3, 0};
- RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, OK, OK, OK, OK, OK};
+ int[] inpRowSet = {1, 0, 2, 0, 3, 0};
+ RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, OK, OK, OK, OK, OK};
List<Integer> outputRowCounts = Arrays.asList(0, 6, 0);
List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, OK, NONE);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
index e1ece37..c039d34 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
@@ -224,7 +224,12 @@ public class TestMetastoreCommands extends ClusterTest {
run("create table dfs.tmp.`%s` as\n" +
"select * from cp.`tpch/region.parquet`", tableName);
- run("analyze table dfs.tmp.`%s` columns none REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table dfs.tmp.`%s` columns none REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+ .go();
String query = "select mykey from dfs.tmp.`%s` where mykey is null";
@@ -361,7 +366,12 @@ public class TestMetastoreCommands extends ClusterTest {
.build();
try {
- run("ANALYZE TABLE dfs.`%s` REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("ANALYZE TABLE dfs.`%s` REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+ .go();
BaseTableMetadata actualTableMetadata = cluster.drillbit().getContext()
.getMetastoreRegistry()
@@ -500,7 +510,12 @@ public class TestMetastoreCommands extends ClusterTest {
.build();
try {
- run("ANALYZE TABLE dfs.tmp.`%s` REFRESH METADATA '%s' level", tableName, analyzeLevel.name());
+ testBuilder()
+ .sqlQuery("ANALYZE TABLE dfs.tmp.`%s` REFRESH METADATA '%s' level", tableName, analyzeLevel.name())
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+ .go();
BaseTableMetadata actualTableMetadata = cluster.drillbit().getContext()
.getMetastoreRegistry()
@@ -530,7 +545,12 @@ public class TestMetastoreCommands extends ClusterTest {
for (MetadataType analyzeLevel : analyzeLevels) {
try {
- run("ANALYZE TABLE dfs.tmp.`%s` REFRESH METADATA '%s' level", tableName, analyzeLevel.name());
+ testBuilder()
+ .sqlQuery("ANALYZE TABLE dfs.tmp.`%s` REFRESH METADATA '%s' level", tableName, analyzeLevel.name())
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+ .go();
List<String> emptyMetadataLevels = Arrays.stream(MetadataType.values())
.filter(metadataType -> metadataType.compareTo(analyzeLevel) > 0
@@ -594,7 +614,12 @@ public class TestMetastoreCommands extends ClusterTest {
.build();
try {
- run("ANALYZE TABLE dfs.tmp.`%s` columns(o_orderstatus) REFRESH METADATA 'row_group' LEVEL", tableName);
+ testBuilder()
+ .sqlQuery("ANALYZE TABLE dfs.tmp.`%s` columns(o_orderstatus) REFRESH METADATA 'row_group' LEVEL", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+ .go();
BaseTableMetadata actualTableMetadata = cluster.drillbit().getContext()
.getMetastoreRegistry()
@@ -639,7 +664,12 @@ public class TestMetastoreCommands extends ClusterTest {
.build();
try {
- run("ANALYZE TABLE dfs.tmp.`%s` columns NONE REFRESH METADATA 'row_group' LEVEL", tableName);
+ testBuilder()
+ .sqlQuery("ANALYZE TABLE dfs.tmp.`%s` columns NONE REFRESH METADATA 'row_group' LEVEL", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+ .go();
BaseTableMetadata actualTableMetadata = cluster.drillbit().getContext()
.getMetastoreRegistry()
@@ -688,7 +718,12 @@ public class TestMetastoreCommands extends ClusterTest {
.build();
try {
- run("ANALYZE TABLE dfs.tmp.`%s` columns(o_orderstatus, o_orderdate) REFRESH METADATA 'row_group' LEVEL", tableName);
+ testBuilder()
+ .sqlQuery("ANALYZE TABLE dfs.tmp.`%s` columns(o_orderstatus, o_orderdate) REFRESH METADATA 'row_group' LEVEL", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+ .go();
BaseTableMetadata actualTableMetadata = cluster.drillbit().getContext()
.getMetastoreRegistry()
@@ -753,7 +788,12 @@ public class TestMetastoreCommands extends ClusterTest {
.build();
try {
- run("ANALYZE TABLE dfs.tmp.`%s` columns(o_orderstatus) REFRESH METADATA 'row_group' LEVEL", tableName);
+ testBuilder()
+ .sqlQuery("ANALYZE TABLE dfs.tmp.`%s` columns(o_orderstatus) REFRESH METADATA 'row_group' LEVEL", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+ .go();
BaseTableMetadata actualTableMetadata = cluster.drillbit().getContext()
.getMetastoreRegistry()
@@ -826,7 +866,12 @@ public class TestMetastoreCommands extends ClusterTest {
.build();
try {
- run("ANALYZE TABLE dfs.tmp.`%s` columns(o_orderstatus, o_orderdate) REFRESH METADATA 'row_group' LEVEL", tableName);
+ testBuilder()
+ .sqlQuery("ANALYZE TABLE dfs.tmp.`%s` columns(o_orderstatus, o_orderdate) REFRESH METADATA 'row_group' LEVEL", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+ .go();
BaseTableMetadata actualTableMetadata = cluster.drillbit().getContext()
.getMetastoreRegistry()
@@ -1658,7 +1703,12 @@ public class TestMetastoreCommands extends ClusterTest {
.build();
try {
- run("ANALYZE TABLE dfs.tmp.`%s` REFRESH METADATA 'file' LEVEL", tableName);
+ testBuilder()
+ .sqlQuery("ANALYZE TABLE dfs.tmp.`%s` REFRESH METADATA 'file' LEVEL", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+ .go();
BaseTableMetadata actualTableMetadata = cluster.drillbit().getContext()
.getMetastoreRegistry()
@@ -1795,7 +1845,12 @@ public class TestMetastoreCommands extends ClusterTest {
.build();
try {
- run("ANALYZE TABLE dfs.`%s` REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("ANALYZE TABLE dfs.`%s` REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+ .go();
BaseTableMetadata actualTableMetadata = cluster.drillbit().getContext()
.getMetastoreRegistry()
@@ -1883,7 +1938,12 @@ public class TestMetastoreCommands extends ClusterTest {
.build();
try {
- run("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+ .go();
BaseTableMetadata actualTableMetadata = cluster.drillbit().getContext()
.getMetastoreRegistry()
@@ -1905,7 +1965,12 @@ public class TestMetastoreCommands extends ClusterTest {
dirTestWatcher.copyResourceToTestTmp(Paths.get("multilevel/parquet"), Paths.get(tableName));
try {
- run("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+ .go();
String query =
"select dir0, dir1, o_custkey, o_orderdate from dfs.tmp.`%s`\n" +
@@ -1936,7 +2001,12 @@ public class TestMetastoreCommands extends ClusterTest {
dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet"), Paths.get(tableName));
try {
- run("analyze table dfs.`%s` REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table dfs.`%s` REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+ .go();
String query =
"select dir0, dir1, o_custkey, o_orderdate from dfs.`%s`\n" +
@@ -1968,20 +2038,25 @@ public class TestMetastoreCommands extends ClusterTest {
run("create table dfs.%s (o_orderdate, o_orderpriority) partition by (o_orderpriority)\n"
+ "as select o_orderdate, o_orderpriority from dfs.`multilevel/parquet/1994/Q1`", tableName);
- run("analyze table dfs.`%s` REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("ANALYZE TABLE dfs.`%s` REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+ .go();
String query = "select * from dfs.%s where o_orderpriority = '1-URGENT'";
long expectedRowCount = 3;
- int expectedNumFiles = 1;
long actualRowCount = queryBuilder().sql(query, tableName).run().recordCount();
assertEquals(expectedRowCount, actualRowCount);
- String numFilesPattern = "numFiles=" + expectedNumFiles;
String usedMetaPattern = "usedMetastore=true";
+ // do not match expected files number since CTAS may create
+ // different files number due to small planner.slice_target value
queryBuilder().sql(query, tableName)
.planMatcher()
- .include(numFilesPattern, usedMetaPattern)
+ .include(usedMetaPattern)
.exclude("Filter")
.match();
} finally {
@@ -1999,20 +2074,26 @@ public class TestMetastoreCommands extends ClusterTest {
+ "as select o_orderdate, convert_to(o_orderpriority, 'UTF8') as o_orderpriority\n"
+ "from dfs.`multilevel/parquet/1994/Q1`", tableName);
- run("analyze table dfs.`%s` REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table dfs.`%s` REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+ .go();
+
String query = String.format("select * from dfs.%s where o_orderpriority = '1-URGENT'", tableName);
long expectedRowCount = 3;
- int expectedNumFiles = 1;
long actualRowCount = queryBuilder().sql(query).run().recordCount();
assertEquals(expectedRowCount, actualRowCount);
- String numFilesPattern = "numFiles=" + expectedNumFiles;
String usedMetaPattern = "usedMetastore=true";
+ // do not match expected files number since CTAS may create
+ // different files number due to small planner.slice_target value
queryBuilder().sql(query, tableName)
.planMatcher()
- .include(numFilesPattern, usedMetaPattern)
+ .include(usedMetaPattern)
.exclude("Filter")
.match();
} finally {
@@ -2028,7 +2109,12 @@ public class TestMetastoreCommands extends ClusterTest {
dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet2"), Paths.get(tableName));
try {
- run("analyze table dfs.`%s` REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table dfs.`%s` REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+ .go();
String query =
"select dir0, dir1, o_custkey, o_orderdate from dfs.`%s`\n" +
@@ -2058,7 +2144,12 @@ public class TestMetastoreCommands extends ClusterTest {
dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet2"), Paths.get(tableName));
try {
- run("analyze table dfs.`%s` REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table dfs.`%s` REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+ .go();
String query =
"select dir0, dir1, o_custkey, o_orderdate from dfs.`%s`\n" +
@@ -2089,7 +2180,12 @@ public class TestMetastoreCommands extends ClusterTest {
dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet2"), Paths.get(tableName));
try {
- run("analyze table dfs.`%s` REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table dfs.`%s` REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+ .go();
String query =
"select dir0, dir1, o_custkey, o_orderdate from dfs.`%s`\n" +
@@ -2118,7 +2214,12 @@ public class TestMetastoreCommands extends ClusterTest {
dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet2"), Paths.get(tableName));
try {
- run("analyze table dfs.`%s` REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table dfs.`%s` REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+ .go();
String query =
"select dir0, dir1, o_custkey, o_orderdate from dfs.`%s`\n" +
@@ -2150,7 +2251,12 @@ public class TestMetastoreCommands extends ClusterTest {
run("create table dfs.`%s` as select * from cp.`tpch/nation.parquet`", tableName);
run("create table dfs.`%1$s/%1$s` as select * from cp.`tpch/nation.parquet`", tableName);
- run("analyze table dfs.`%s` REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table dfs.`%s` REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+ .go();
String query = "select * from dfs.`%s`";
long expectedRowCount = 50;
@@ -2174,7 +2280,7 @@ public class TestMetastoreCommands extends ClusterTest {
@Test
public void testFieldWithDots() throws Exception {
- String tableName = "dfs.tmp.`complex_table`";
+ String tableName = "dfs.tmp.complex_table";
try {
run("create table %s as\n" +
"select cast(1 as int) as `column.with.dots`, t.`column`.`with.dots`\n" +
@@ -2191,7 +2297,12 @@ public class TestMetastoreCommands extends ClusterTest {
.include("usedMetastore=false")
.match();
- run("analyze table %s REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table %s REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [%s]", tableName))
+ .go();
actualRowCount = queryBuilder().sql(query, tableName).run().recordCount();
@@ -2208,13 +2319,18 @@ public class TestMetastoreCommands extends ClusterTest {
@Test
public void testBooleanPartitionPruning() throws Exception {
- String tableName = "dfs.tmp.`interval_bool_partition`";
+ String tableName = "dfs.tmp.interval_bool_partition";
try {
run("create table %s partition by (col_bln) as\n" +
"select * from cp.`parquet/alltypes_required.parquet`", tableName);
- run("analyze table %s REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table %s REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [%s]", tableName))
+ .go();
String query = "select * from %s where col_bln = true";
int expectedRowCount = 2;
@@ -2249,7 +2365,12 @@ public class TestMetastoreCommands extends ClusterTest {
"union all\n" +
"select col_notexist from cp.`tpch/region.parquet`", tableName);
- run("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+ .go();
String query = "select mykey from dfs.tmp.`t5` where mykey = 100";
long actualRowCount = queryBuilder().sql(query, tableName).run().recordCount();
@@ -2278,7 +2399,12 @@ public class TestMetastoreCommands extends ClusterTest {
run("create table dfs.tmp.`%s/b` as\n" +
"select case when true then 100 else null end as mykey from cp.`tpch/region.parquet`", tableName);
- run("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+ .go();
String query = "select mykey from dfs.tmp.`%s` where mykey is null";
@@ -2308,7 +2434,12 @@ public class TestMetastoreCommands extends ClusterTest {
run("create table dfs.tmp.`%s/b` as\n" +
"select case when true then 100 else null end as mykey from cp.`tpch/region.parquet`", tableName);
- run("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+ .go();
String query = "select mykey from dfs.tmp.`%s` where mykey is null";
@@ -2338,7 +2469,12 @@ public class TestMetastoreCommands extends ClusterTest {
run("create table dfs.tmp.`%s/b` as\n" +
"select case when true then 100 else null end as mykey from cp.`tpch/region.parquet`", tableName);
- run("analyze table dfs.tmp.`%s` columns none REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table dfs.tmp.`%s` columns none REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+ .go();
String query = "select mykey from dfs.tmp.`%s` where mykey is null";
@@ -2364,7 +2500,12 @@ public class TestMetastoreCommands extends ClusterTest {
dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet"), Paths.get(tableName));
try {
- run("analyze table dfs.`%s` REFRESH METADATA 'file' level", tableName);
+ testBuilder()
+ .sqlQuery("analyze table dfs.`%s` REFRESH METADATA 'file' level", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+ .go();
String query = "select * from dfs.`%s`";
long expectedRowCount = 120;
@@ -2387,13 +2528,18 @@ public class TestMetastoreCommands extends ClusterTest {
}
@Test
- public void testAnalyzeWithFallbackError() throws Exception {
+ public void testAnalyzeWithDisabledFallback() throws Exception {
String tableName = "parquetAnalyzeWithFallback";
dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet"), Paths.get(tableName));
try {
- run("analyze table dfs.`%s` REFRESH METADATA 'file' level", tableName);
+ testBuilder()
+ .sqlQuery("analyze table dfs.`%s` REFRESH METADATA 'file' level", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+ .go();
client.alterSession(ExecConstants.METASTORE_FALLBACK_TO_FILE_METADATA, false);
queryBuilder()
@@ -2414,7 +2560,12 @@ public class TestMetastoreCommands extends ClusterTest {
dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet"), Paths.get(tableName));
try {
- run("analyze table dfs.`%s` REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table dfs.`%s` REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+ .go();
client.alterSession(ExecConstants.METASTORE_USE_SCHEMA_METADATA, false);
queryBuilder()
@@ -2438,7 +2589,12 @@ public class TestMetastoreCommands extends ClusterTest {
client.alterSession(ExecConstants.METASTORE_USE_SCHEMA_METADATA, false);
client.alterSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE, true);
run("create table %s as select 'a' as c from (values(1))", table);
- run("analyze table %s REFRESH METADATA", table);
+ testBuilder()
+ .sqlQuery("analyze table %s REFRESH METADATA", table)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [%s]", table))
+ .go();
run("create schema (o_orderstatus varchar) for table %s", table);
@@ -2460,7 +2616,12 @@ public class TestMetastoreCommands extends ClusterTest {
client.alterSession(PlannerSettings.STATISTICS_USE.getOptionName(), true);
- run("analyze table %s REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table %s REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [%s]", tableName))
+ .go();
String query = " select employee_id from %s where department_id = 2";
@@ -2487,7 +2648,12 @@ public class TestMetastoreCommands extends ClusterTest {
client.alterSession(PlannerSettings.STATISTICS_USE.getOptionName(), false);
- run("analyze table %s REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table %s REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [%s]", tableName))
+ .go();
String query = "select employee_id from %s where department_id = 2";
@@ -2515,7 +2681,12 @@ public class TestMetastoreCommands extends ClusterTest {
client.alterSession(PlannerSettings.STATISTICS_USE.getOptionName(), false);
- run("analyze table %s REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table %s REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [%s]", tableName))
+ .go();
client.alterSession(PlannerSettings.STATISTICS_USE.getOptionName(), true);
@@ -2546,7 +2717,12 @@ public class TestMetastoreCommands extends ClusterTest {
client.alterSession(PlannerSettings.STATISTICS_USE.getOptionName(), true);
- run("ANALYZE TABLE %s COLUMNS(department_id) REFRESH METADATA COMPUTE STATISTICS SAMPLE 95 PERCENT", tableName);
+ testBuilder()
+ .sqlQuery("ANALYZE TABLE %s COLUMNS(department_id) REFRESH METADATA COMPUTE STATISTICS SAMPLE 95 PERCENT", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [%s]", tableName))
+ .go();
String query = "select employee_id from %s where department_id = 2";
@@ -2572,7 +2748,12 @@ public class TestMetastoreCommands extends ClusterTest {
run("create table dfs.tmp.`%s` as\n" +
"select * from cp.`tpch/region.parquet`", tableName);
- run("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+ .go();
MetastoreTableInfo metastoreTableInfo = cluster.drillbit().getContext()
.getMetastoreRegistry()
@@ -2712,7 +2893,12 @@ public class TestMetastoreCommands extends ClusterTest {
File table = dirTestWatcher.copyResourceToTestTmp(Paths.get("multilevel/parquet"), Paths.get(tableName));
try {
- run("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+ .go();
File fileToUpdate = new File(new File(new File(table, "1994"), "Q4"), "orders_94_q4.parquet");
long lastModified = fileToUpdate.lastModified();
@@ -2756,7 +2942,12 @@ public class TestMetastoreCommands extends ClusterTest {
File table = dirTestWatcher.copyResourceToTestTmp(Paths.get("multilevel/parquet"), Paths.get(tableName));
try {
- run("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName);
+ testBuilder()
+ .sqlQuery("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+ .go();
dirTestWatcher.copyResourceToTestTmp(
Paths.get("multilevel", "parquet", "1994", "Q1", "orders_94_q1.parquet"),
diff --git a/exec/java-exec/src/test/resources/functions/test_covariance.json b/exec/java-exec/src/test/resources/functions/test_covariance.json
index 0d74f70..33f9567 100644
--- a/exec/java-exec/src/test/resources/functions/test_covariance.json
+++ b/exec/java-exec/src/test/resources/functions/test_covariance.json
@@ -64,10 +64,10 @@
"ref" : "`EXPR$7`",
"expr" : "corr(`A`, `B`) "
}, {
- "ref" : "`EXPR$7`",
+ "ref" : "`EXPR$8`",
"expr" : "corr(`A`, `C`) "
}, {
- "ref" : "`EXPR$8`",
+ "ref" : "`EXPR$9`",
"expr" : "corr(`C`, `D`) "
} ]
}, {
@@ -75,4 +75,4 @@
"@id" : 4,
"child" : 3
} ]
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/test/resources/functions/test_logical_aggr.json b/exec/java-exec/src/test/resources/functions/test_logical_aggr.json
index f6d83b5..6e071c5 100644
--- a/exec/java-exec/src/test/resources/functions/test_logical_aggr.json
+++ b/exec/java-exec/src/test/resources/functions/test_logical_aggr.json
@@ -61,10 +61,10 @@
"ref" : "`EXPR$5`",
"expr" : "bit_or(`D`) "
}, {
- "ref" : "`EXPR$5`",
+ "ref" : "`EXPR$6`",
"expr" : "bool_or(`C`) "
}, {
- "ref" : "`EXPR$6`",
+ "ref" : "`EXPR$7`",
"expr" : "bool_and(`C`) "
} ]
}, {
@@ -72,4 +72,4 @@
"@id" : 4,
"child" : 3
} ]
-}
\ No newline at end of file
+}
diff --git a/exec/vector/src/main/codegen/templates/ComplexWriters.java b/exec/vector/src/main/codegen/templates/ComplexWriters.java
index 3539c62..904c825 100644
--- a/exec/vector/src/main/codegen/templates/ComplexWriters.java
+++ b/exec/vector/src/main/codegen/templates/ComplexWriters.java
@@ -106,7 +106,11 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
public void setPosition(int idx) {
super.setPosition(idx);
- mutator.startNewValue(idx);
+ // calls startNewValue only for the case
+ // when it wouldn't override existing data
+ if (idx >= vector.getAccessor().getValueCount()) {
+ mutator.startNewValue(idx);
+ }
}
<#else>
diff --git a/logical/src/main/java/org/apache/drill/common/expression/FunctionHolderExpression.java b/logical/src/main/java/org/apache/drill/common/expression/FunctionHolderExpression.java
index 8d4db48..30f6975 100644
--- a/logical/src/main/java/org/apache/drill/common/expression/FunctionHolderExpression.java
+++ b/logical/src/main/java/org/apache/drill/common/expression/FunctionHolderExpression.java
@@ -95,7 +95,7 @@ public abstract class FunctionHolderExpression extends LogicalExpressionBase {
*
* @param fieldReference FieldReference to set.
*/
- public void getFieldReference(FieldReference fieldReference) {
+ public void setFieldReference(FieldReference fieldReference) {
this.fieldReference = fieldReference;
}
}
diff --git a/logical/src/main/java/org/apache/drill/common/logical/data/MetadataAggregate.java b/logical/src/main/java/org/apache/drill/common/logical/data/MetadataAggregate.java
index 8d065bb..183c9b2 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/data/MetadataAggregate.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/data/MetadataAggregate.java
@@ -33,6 +33,6 @@ public class MetadataAggregate extends SingleInputOperator {
@Override
public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> logicalVisitor, X value) throws E {
- throw new UnsupportedOperationException("MetadataController does not support visitors");
+ throw new UnsupportedOperationException("MetadataAggregate does not support visitors");
}
}