You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2018/06/22 21:14:32 UTC
[drill] 01/09: DRILL-5188: Expand sub-queries using rules
This is an automated email from the ASF dual-hosted git repository.
vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 502d2977092eecda0a4aa0482b5f96459c315227
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Fri May 18 15:54:16 2018 +0300
DRILL-5188: Expand sub-queries using rules
- Add check for agg with group by literal
- Allow NLJ for limit 1
- Implement single_value aggregate function
closes #1321
---
exec/java-exec/src/main/codegen/config.fmpp | 1 +
.../src/main/codegen/data/SingleValue.tdd | 62 +++++++++
.../src/main/codegen/templates/SingleValueAgg.java | 144 +++++++++++++++++++++
.../drill/exec/physical/impl/join/JoinUtils.java | 105 ++++++++++++++-
.../apache/drill/exec/planner/PlannerPhase.java | 10 ++
.../apache/drill/exec/planner/RuleInstance.java | 10 ++
.../exec/planner/logical/PreProcessLogicalRel.java | 18 +--
.../drill/exec/planner/sql/SqlConverter.java | 8 +-
.../planner/sql/handlers/DefaultSqlHandler.java | 12 +-
.../java/org/apache/drill/TestCorrelation.java | 46 +++++++
.../apache/drill/TestDisabledFunctionality.java | 37 ------
.../java/org/apache/drill/TestExampleQueries.java | 33 +++++
.../java/org/apache/drill/TestTpchDistributed.java | 1 -
.../apache/drill/TestTpchDistributedStreaming.java | 1 -
.../java/org/apache/drill/TestTpchExplain.java | 1 -
.../test/java/org/apache/drill/TestTpchLimit0.java | 1 -
.../java/org/apache/drill/TestTpchPlanning.java | 1 -
.../java/org/apache/drill/TestTpchSingleMode.java | 1 -
.../drill/exec/fn/impl/TestAggregateFunctions.java | 80 ++++++++++++
pom.xml | 2 +-
20 files changed, 504 insertions(+), 70 deletions(-)
diff --git a/exec/java-exec/src/main/codegen/config.fmpp b/exec/java-exec/src/main/codegen/config.fmpp
index 50f110d..e233974 100644
--- a/exec/java-exec/src/main/codegen/config.fmpp
+++ b/exec/java-exec/src/main/codegen/config.fmpp
@@ -43,6 +43,7 @@ data: {
intervalNumericTypes: tdd(../data/IntervalNumericTypes.tdd),
extract: tdd(../data/ExtractTypes.tdd),
sumzero: tdd(../data/SumZero.tdd),
+ singleValue: tdd(../data/SingleValue.tdd),
numericTypes: tdd(../data/NumericTypes.tdd),
casthigh: tdd(../data/CastHigh.tdd),
countAggrTypes: tdd(../data/CountAggrTypes.tdd)
diff --git a/exec/java-exec/src/main/codegen/data/SingleValue.tdd b/exec/java-exec/src/main/codegen/data/SingleValue.tdd
new file mode 100644
index 0000000..a42fe3b
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/data/SingleValue.tdd
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http:# www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+types: [
+ {inputType: "Bit", outputType: "NullableBit", runningType: "Bit", major: "primitive"},
+ {inputType: "TinyInt", outputType: "NullableTinyInt", runningType: "TinyInt", major: "primitive"},
+ {inputType: "NullableTinyInt", outputType: "NullableTinyInt", runningType: "TinyInt", major: "primitive"},
+ {inputType: "UInt1", outputType: "NullableUInt1", runningType: "UInt1", major: "primitive"},
+ {inputType: "NullableUInt1", outputType: "NullableUInt1", runningType: "UInt1", major: "primitive"},
+ {inputType: "UInt2", outputType: "NullableUInt2", runningType: "UInt2", major: "primitive"},
+ {inputType: "NullableUInt2", outputType: "NullableUInt2", runningType: "UInt2", major: "primitive"},
+ {inputType: "SmallInt", outputType: "NullableSmallInt", runningType: "SmallInt", major: "primitive"},
+ {inputType: "NullableSmallInt", outputType: "NullableSmallInt", runningType: "SmallInt", major: "primitive"},
+ {inputType: "UInt4", outputType: "NullableUInt4", runningType: "UInt4", major: "primitive"},
+ {inputType: "NullableUInt4", outputType: "NullableUInt4", runningType: "UInt4", major: "primitive"},
+ {inputType: "UInt8", outputType: "NullableUInt8", runningType: "UInt8", major: "primitive"},
+ {inputType: "NullableUInt8", outputType: "NullableUInt8", runningType: "UInt8", major: "primitive"},
+ {inputType: "Int", outputType: "NullableInt", runningType: "Int", major: "primitive"},
+ {inputType: "BigInt", outputType: "NullableBigInt", runningType: "BigInt", major: "primitive"},
+ {inputType: "NullableBit", outputType: "NullableBit", runningType: "Bit", major: "primitive"},
+ {inputType: "NullableInt", outputType: "NullableInt", runningType: "Int", major: "primitive"},
+ {inputType: "NullableBigInt", outputType: "NullableBigInt", runningType: "BigInt", major: "primitive"},
+ {inputType: "Float4", outputType: "NullableFloat4", runningType: "Float4", major: "primitive"},
+ {inputType: "Float8", outputType: "NullableFloat8", runningType: "Float8", major: "primitive"},
+ {inputType: "NullableFloat4", outputType: "NullableFloat4", runningType: "Float4", major: "primitive"},
+ {inputType: "NullableFloat8", outputType: "NullableFloat8", runningType: "Float8", major: "primitive"},
+ {inputType: "Date", outputType: "NullableDate", runningType: "Date", major: "primitive"},
+ {inputType: "NullableDate", outputType: "NullableDate", runningType: "Date", major: "primitive"},
+ {inputType: "TimeStamp", outputType: "NullableTimeStamp", runningType: "TimeStamp", major: "primitive"},
+ {inputType: "NullableTimeStamp", outputType: "NullableTimeStamp", runningType: "TimeStamp", major: "primitive"},
+ {inputType: "Time", outputType: "NullableTime", runningType: "Time", major: "primitive"},
+ {inputType: "NullableTime", outputType: "NullableTime", runningType: "Time", major: "primitive"},
+ {inputType: "IntervalDay", outputType: "NullableIntervalDay", runningType: "IntervalDay", major: "IntervalDay"},
+ {inputType: "NullableIntervalDay", outputType: "NullableIntervalDay", runningType: "IntervalDay", major: "IntervalDay"},
+ {inputType: "IntervalYear", outputType: "NullableIntervalYear", runningType: "IntervalYear", major: "primitive"},
+ {inputType: "NullableIntervalYear", outputType: "NullableIntervalYear", runningType: "IntervalYear", major: "primitive"},
+ {inputType: "Interval", outputType: "NullableInterval", runningType: "Interval", major: "Interval"},
+ {inputType: "NullableInterval", outputType: "NullableInterval", runningType: "Interval", major: "Interval"},
+ {inputType: "VarDecimal", outputType: "NullableVarDecimal", runningType: "VarDecimal", major: "VarDecimal"},
+ {inputType: "NullableVarDecimal", outputType: "NullableVarDecimal", runningType: "VarDecimal", major: "VarDecimal"},
+ {inputType: "VarChar", outputType: "NullableVarChar", runningType: "VarChar", major: "bytes"},
+ {inputType: "NullableVarChar", outputType: "NullableVarChar", runningType: "VarChar", major: "bytes"},
+ {inputType: "Var16Char", outputType: "NullableVar16Char", runningType: "Var16Char", major: "bytes"},
+ {inputType: "NullableVar16Char", outputType: "NullableVar16Char", runningType: "Var16Char", major: "bytes"},
+ {inputType: "VarBinary", outputType: "NullableVarBinary", runningType: "VarBinary", major: "bytes"},
+ {inputType: "NullableVarBinary", outputType: "NullableVarBinary", runningType: "VarBinary", major: "bytes"}
+ ]
+}
diff --git a/exec/java-exec/src/main/codegen/templates/SingleValueAgg.java b/exec/java-exec/src/main/codegen/templates/SingleValueAgg.java
new file mode 100644
index 0000000..c0ff6cf
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/templates/SingleValueAgg.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+<@pp.dropOutputFile />
+
+<@pp.changeOutputFile name="/org/apache/drill/exec/expr/fn/impl/gaggr/SingleValueFunctions.java" />
+
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.expr.fn.impl.gaggr;
+
+import org.apache.drill.exec.expr.DrillAggFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.annotations.Workspace;
+import org.apache.drill.exec.expr.holders.*;
+
+import javax.inject.Inject;
+import io.netty.buffer.DrillBuf;
+
+/*
+ * This class is generated using freemarker and the ${.template_name} template.
+ */
+@SuppressWarnings("unused")
+public class SingleValueFunctions {
+<#list singleValue.types as type>
+
+ @FunctionTemplate(name = "single_value",
+ <#if type.major == "VarDecimal">
+ returnType = FunctionTemplate.ReturnType.DECIMAL_AVG_AGGREGATE,
+ </#if>
+ scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class ${type.inputType}SingleValue implements DrillAggFunc {
+ @Param ${type.inputType}Holder in;
+ @Workspace ${type.runningType}Holder value;
+ @Output ${type.outputType}Holder out;
+ @Workspace BigIntHolder nonNullCount;
+ <#if type.major == "VarDecimal" || type.major == "bytes">
+ @Inject DrillBuf buffer;
+ </#if>
+
+ public void setup() {
+ nonNullCount = new BigIntHolder();
+ nonNullCount.value = 0;
+ value = new ${type.runningType}Holder();
+ }
+
+ @Override
+ public void add() {
+ <#if type.inputType?starts_with("Nullable")>
+ sout: {
+ if (in.isSet == 0) {
+ // processing nullable input and the value is null, so don't do anything...
+ break sout;
+ }
+ </#if>
+ if (nonNullCount.value == 0) {
+ nonNullCount.value = 1;
+ } else {
+ throw org.apache.drill.common.exceptions.UserException.functionError()
+ .message("Input for single_value function has more than one row")
+ .build();
+ }
+ <#if type.major == "primitive">
+ value.value = in.value;
+ <#elseif type.major == "IntervalDay">
+ value.days = in.days;
+ value.milliseconds = in.milliseconds;
+ <#elseif type.major == "Interval">
+ value.days = in.days;
+ value.milliseconds = in.milliseconds;
+ value.months = in.months;
+ <#elseif type.major == "VarDecimal">
+ value.start = in.start;
+ value.end = in.end;
+ value.buffer = in.buffer;
+ value.scale = in.scale;
+ value.precision = in.precision;
+ <#elseif type.major == "bytes">
+ value.start = in.start;
+ value.end = in.end;
+ value.buffer = in.buffer;
+ </#if>
+ <#if type.inputType?starts_with("Nullable")>
+ } // end of sout block
+ </#if>
+ }
+
+ @Override
+ public void output() {
+ if (nonNullCount.value > 0) {
+ out.isSet = 1;
+ <#if type.major == "primitive">
+ out.value = value.value;
+ <#elseif type.major == "IntervalDay">
+ out.days = value.days;
+ out.milliseconds = value.milliseconds;
+ <#elseif type.major == "Interval">
+ out.days = value.days;
+ out.milliseconds = value.milliseconds;
+ out.months = value.months;
+ <#elseif type.major == "VarDecimal">
+ out.start = value.start;
+ out.end = value.end;
+ out.buffer = buffer.reallocIfNeeded(value.end - value.start);
+ out.buffer.writeBytes(value.buffer, value.start, value.end - value.start);
+ out.scale = value.scale;
+ out.precision = value.precision;
+ <#elseif type.major == "bytes">
+ out.start = value.start;
+ out.end = value.end;
+ out.buffer = buffer.reallocIfNeeded(value.end - value.start);
+ out.buffer.writeBytes(value.buffer, value.start, value.end - value.start);
+ </#if>
+ } else {
+ out.isSet = 0;
+ }
+ }
+
+ @Override
+ public void reset() {
+ value = new ${type.runningType}Holder();
+ nonNullCount.value = 0;
+ }
+ }
+</#list>
+}
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
index e4dab91..b974537 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
@@ -17,9 +17,22 @@
*/
package org.apache.drill.exec.physical.impl.join;
+import org.apache.calcite.rel.RelShuttleImpl;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableFunctionScan;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.util.Util;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.calcite.rel.RelNode;
@@ -35,9 +48,11 @@ import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.planner.logical.DrillLimitRel;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.resolver.TypeCastRules;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -220,7 +235,13 @@ public class JoinUtils {
if (currentrel instanceof DrillAggregateRel) {
agg = (DrillAggregateRel)currentrel;
} else if (currentrel instanceof RelSubset) {
- currentrel = ((RelSubset)currentrel).getBest() ;
+ currentrel = ((RelSubset) currentrel).getBest() ;
+ } else if (currentrel instanceof DrillLimitRel) {
+ // TODO: Improve this check when DRILL-5691 is fixed.
+ // The problem is that RelMdMaxRowCount currently cannot be used
+ // due to CALCITE-1048.
+ Integer fetchValue = ((RexLiteral) ((DrillLimitRel) currentrel).getFetch()).getValueAs(Integer.class);
+ return fetchValue != null && fetchValue <= 1;
} else if (currentrel.getInputs().size() == 1) {
// If the rel is not an aggregate or RelSubset, but is a single-input rel (could be Project,
// Filter, Sort etc.), check its input
@@ -234,6 +255,17 @@ public class JoinUtils {
if (agg.getGroupSet().isEmpty()) {
return true;
}
+ // Checks that expression in group by is a single and it is literal.
+ // When Calcite rewrites EXISTS sub-queries using SubQueryRemoveRule rules,
+ // it creates project with TRUE literal in expressions list and aggregate on top of it
+ // with empty call list and literal from project expression in group set.
+ if (agg.getAggCallList().isEmpty() && agg.getGroupSet().cardinality() == 1) {
+ ProjectExpressionsCollector expressionsCollector = new ProjectExpressionsCollector();
+ agg.accept(expressionsCollector);
+ List<RexNode> projectedExpressions = expressionsCollector.getProjectedExpressions();
+ return projectedExpressions.size() == 1
+ && RexUtil.isLiteral(projectedExpressions.get(agg.getGroupSet().nth(0)), true);
+ }
}
return false;
}
@@ -267,4 +299,75 @@ public class JoinUtils {
return isScalarSubquery(left) || isScalarSubquery(right);
}
+ /**
+ * Collects expressions list from the input project.
+ * For the case when input rel node has single input, its input is taken.
+ */
+ private static class ProjectExpressionsCollector extends RelShuttleImpl {
+ private final List<RexNode> expressions = new ArrayList<>();
+
+ @Override
+ public RelNode visit(RelNode other) {
+ // RelShuttleImpl doesn't have visit methods for Project and RelSubset.
+ if (other instanceof RelSubset) {
+ return visit((RelSubset) other);
+ } else if (other instanceof Project) {
+ return visit((Project) other);
+ }
+ return super.visit(other);
+ }
+
+ @Override
+ public RelNode visit(TableFunctionScan scan) {
+ return scan;
+ }
+
+ @Override
+ public RelNode visit(LogicalJoin join) {
+ return join;
+ }
+
+ @Override
+ public RelNode visit(LogicalCorrelate correlate) {
+ return correlate;
+ }
+
+ @Override
+ public RelNode visit(LogicalUnion union) {
+ return union;
+ }
+
+ @Override
+ public RelNode visit(LogicalIntersect intersect) {
+ return intersect;
+ }
+
+ @Override
+ public RelNode visit(LogicalMinus minus) {
+ return minus;
+ }
+
+ @Override
+ public RelNode visit(LogicalSort sort) {
+ return sort;
+ }
+
+ @Override
+ public RelNode visit(LogicalExchange exchange) {
+ return exchange;
+ }
+
+ private RelNode visit(Project project) {
+ expressions.addAll(project.getProjects());
+ return project;
+ }
+
+ private RelNode visit(RelSubset subset) {
+ return Util.first(subset.getBest(), subset.getOriginal()).accept(this);
+ }
+
+ public List<RexNode> getProjectedExpressions() {
+ return expressions;
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index 2a79751..b78d76c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -113,6 +113,16 @@ public enum PlannerPhase {
}
},
+ SUBQUERY_REWRITE("Sub-queries rewrites") {
+ public RuleSet getRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins) {
+ return RuleSets.ofList(
+ RuleInstance.SUB_QUERY_FILTER_REMOVE_RULE,
+ RuleInstance.SUB_QUERY_PROJECT_REMOVE_RULE,
+ RuleInstance.SUB_QUERY_JOIN_REMOVE_RULE
+ );
+ }
+ },
+
LOGICAL_PRUNE("Logical Planning (with partition pruning)") {
public RuleSet getRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins) {
return PlannerPhase.mergedRuleSets(
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
index 80bbe88..8aec96c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
@@ -40,6 +40,7 @@ import org.apache.calcite.rel.rules.ProjectToWindowRule;
import org.apache.calcite.rel.rules.ProjectWindowTransposeRule;
import org.apache.calcite.rel.rules.ReduceExpressionsRule;
import org.apache.calcite.rel.rules.SortRemoveRule;
+import org.apache.calcite.rel.rules.SubQueryRemoveRule;
import org.apache.calcite.rel.rules.UnionToDistinctRule;
import org.apache.drill.exec.planner.logical.DrillConditions;
import org.apache.drill.exec.planner.logical.DrillRelFactories;
@@ -130,4 +131,13 @@ public interface RuleInstance {
FilterRemoveIsNotDistinctFromRule REMOVE_IS_NOT_DISTINCT_FROM_RULE =
new FilterRemoveIsNotDistinctFromRule(DrillRelBuilder.proto(DrillRelFactories.DRILL_LOGICAL_FILTER_FACTORY));
+
+ SubQueryRemoveRule SUB_QUERY_FILTER_REMOVE_RULE =
+ new SubQueryRemoveRule.SubQueryFilterRemoveRule(DrillRelFactories.LOGICAL_BUILDER);
+
+ SubQueryRemoveRule SUB_QUERY_PROJECT_REMOVE_RULE =
+ new SubQueryRemoveRule.SubQueryProjectRemoveRule(DrillRelFactories.LOGICAL_BUILDER);
+
+ SubQueryRemoveRule SUB_QUERY_JOIN_REMOVE_RULE =
+ new SubQueryRemoveRule.SubQueryJoinRemoveRule(DrillRelFactories.LOGICAL_BUILDER);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java
index f3c6ce0..cd696ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java
@@ -32,8 +32,6 @@ import org.apache.drill.exec.planner.sql.DrillOperatorTable;
import org.apache.drill.exec.planner.sql.parser.DrillCalciteWrapperUtility;
import org.apache.drill.exec.util.ApproximateStringMatcher;
import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttleImpl;
@@ -46,7 +44,6 @@ import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.fun.SqlSingleValueAggFunction;
import org.apache.calcite.util.NlsString;
/**
@@ -79,19 +76,6 @@ public class PreProcessLogicalRel extends RelShuttleImpl {
}
@Override
- public RelNode visit(LogicalAggregate aggregate) {
- for(AggregateCall aggregateCall : aggregate.getAggCallList()) {
- if(aggregateCall.getAggregation() instanceof SqlSingleValueAggFunction) {
- unsupportedOperatorCollector.setException(SqlUnsupportedException.ExceptionType.FUNCTION,
- "Non-scalar sub-query used in an expression\n" +
- "See Apache Drill JIRA: DRILL-1937");
- throw new UnsupportedOperationException();
- }
- }
- return visitChild(aggregate, 0, aggregate.getInput());
- }
-
- @Override
public RelNode visit(LogicalProject project) {
final List<RexNode> projExpr = Lists.newArrayList();
for(RexNode rexNode : project.getChildExps()) {
@@ -168,7 +152,7 @@ public class PreProcessLogicalRel extends RelShuttleImpl {
exprList.add(newExpr);
}
- if (rewrite == true) {
+ if (rewrite) {
LogicalProject newProject = project.copy(project.getTraitSet(), project.getInput(0), exprList, project.getRowType());
return visitChild(newProject, 0, project.getInput());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
index b8659d1..3f65ad2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
@@ -64,7 +64,6 @@ import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
import org.apache.calcite.sql.validate.SqlValidatorImpl;
import org.apache.calcite.sql.validate.SqlValidatorScope;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
-import org.apache.calcite.sql2rel.RelDecorrelator;
import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.calcite.util.Util;
@@ -384,10 +383,7 @@ public class SqlConverter {
//To avoid unexpected column errors set a value of top to false
final RelRoot rel = sqlToRelConverter.convertQuery(validatedNode, false, false);
final RelRoot rel2 = rel.withRel(sqlToRelConverter.flattenTypes(rel.rel, true));
- final RelRoot rel3 = rel2.withRel(
- RelDecorrelator.decorrelateQuery(rel2.rel,
- sqlToRelConverterConfig.getRelBuilderFactory().create(cluster, null)));
- return rel3;
+ return rel2;
}
private class Expander implements RelOptTable.ViewExpander {
@@ -478,7 +474,7 @@ public class SqlConverter {
@Override
public boolean isExpand() {
- return SqlToRelConverterConfig.DEFAULT.isExpand();
+ return false;
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 98e017f..1e671ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -55,6 +55,7 @@ import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.sql2rel.RelDecorrelator;
import org.apache.calcite.tools.Program;
import org.apache.calcite.tools.Programs;
import org.apache.calcite.tools.RelConversionException;
@@ -78,6 +79,7 @@ import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.cost.DrillDefaultRelMetadataProvider;
import org.apache.drill.exec.planner.logical.DrillProjectRel;
import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillRelFactories;
import org.apache.drill.exec.planner.logical.DrillScreenRel;
import org.apache.drill.exec.planner.logical.DrillStoreRel;
import org.apache.drill.exec.planner.logical.PreProcessLogicalRel;
@@ -658,10 +660,16 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
return typedSqlNode;
}
- private RelNode convertToRel(SqlNode node) throws RelConversionException {
+ private RelNode convertToRel(SqlNode node) {
final RelNode convertedNode = config.getConverter().toRel(node).rel;
log("INITIAL", convertedNode, logger, null);
- return transform(PlannerType.HEP, PlannerPhase.WINDOW_REWRITE, convertedNode);
+ RelNode transformedNode = transform(PlannerType.HEP,
+ PlannerPhase.SUBQUERY_REWRITE, convertedNode);
+
+ RelNode decorrelatedNode = RelDecorrelator.decorrelateQuery(transformedNode,
+ DrillRelFactories.LOGICAL_BUILDER.create(transformedNode.getCluster(), null));
+
+ return transform(PlannerType.HEP, PlannerPhase.WINDOW_REWRITE, decorrelatedNode);
}
private RelNode preprocessNode(RelNode rel) throws SqlUnsupportedException {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestCorrelation.java b/exec/java-exec/src/test/java/org/apache/drill/TestCorrelation.java
index 47f0ae8..d40540e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestCorrelation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestCorrelation.java
@@ -58,4 +58,50 @@ public class TestCorrelation extends PlanTestBase {
.run();
}
+ @Test
+ public void testExistsScalarSubquery() throws Exception {
+ String query =
+ "SELECT employee_id\n" +
+ "FROM cp.`employee.json`\n" +
+ "WHERE EXISTS\n" +
+ " (SELECT *\n" +
+ " FROM cp.`employee.json` cs2\n" +
+ " )\n" +
+ "limit 1";
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("employee_id")
+ .baselineValues(1L)
+ .go();
+ }
+
+ @Test
+ public void testSeveralExistsCorrelateSubquery() throws Exception {
+ String query =
+ "SELECT cs1.employee_id\n" +
+ "FROM cp.`employee.json` cs1,\n" +
+ " cp.`employee.json` cs3\n" +
+ "WHERE cs1.hire_date = cs3.hire_date\n" +
+ " AND EXISTS\n" +
+ " (SELECT *\n" +
+ " FROM cp.`employee.json` cs2\n" +
+ " WHERE " +
+ " cs1.position_id > cs2.position_id\n" +
+ " AND" +
+ " cs1.epmloyee_id = cs2.epmloyee_id" +
+ " )\n" +
+ " AND EXISTS\n" +
+ " (SELECT *\n" +
+ " FROM cp.`employee.json` cr1\n" +
+ " WHERE cs1.position_id = cr1.position_id)\n" +
+ "LIMIT 1";
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .expectsEmptyResultSet()
+ .go();
+ }
}
\ No newline at end of file
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java b/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
index 781fce3..679d01e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
@@ -47,30 +47,6 @@ public class TestDisabledFunctionality extends BaseTestQuery {
throw ex;
}
- @Test(expected = UnsupportedFunctionException.class) // see DRILL-1937
- public void testDisabledExplainplanForComparisonWithNonscalarSubquery() throws Exception {
- try {
- test("explain plan for select n_name from cp.`tpch/nation.parquet` " +
- "where n_nationkey = " +
- "(select r_regionkey from cp.`tpch/region.parquet` " +
- "where r_regionkey = 1)");
- } catch(UserException ex) {
- throwAsUnsupportedException(ex);
- }
- }
-
- @Test(expected = UnsupportedFunctionException.class) // see DRILL-1937
- public void testDisabledComparisonWithNonscalarSubquery() throws Exception {
- try {
- test("select n_name from cp.`tpch/nation.parquet` " +
- "where n_nationkey = " +
- "(select r_regionkey from cp.`tpch/region.parquet` " +
- "where r_regionkey = 1)");
- } catch(UserException ex) {
- throwAsUnsupportedException(ex);
- }
- }
-
@Test(expected = UnsupportedRelOperatorException.class) // see DRILL-1921
public void testDisabledIntersect() throws Exception {
try {
@@ -215,19 +191,6 @@ public class TestDisabledFunctionality extends BaseTestQuery {
}
}
- @Test(expected = UnsupportedFunctionException.class) // see DRILL-1325, DRILL-2155, see DRILL-1937
- public void testMultipleUnsupportedOperatorations() throws Exception {
- try {
- test("select a.lastname, b.n_name " +
- "from cp.`employee.json` a, cp.`tpch/nation.parquet` b " +
- "where b.n_nationkey = " +
- "(select r_regionkey from cp.`tpch/region.parquet` " +
- "where r_regionkey = 1)");
- } catch(UserException ex) {
- throwAsUnsupportedException(ex);
- }
- }
-
@Test(expected = UnsupportedRelOperatorException.class) // see DRILL-2068, DRILL-1325
public void testExplainPlanForCartesianJoin() throws Exception {
try {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 3e17a7a..adc8e35 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -1165,4 +1165,37 @@ public class TestExampleQueries extends BaseTestQuery {
.build()
.run();
}
+
+ @Test
+ public void testComparisonWithSingleValueSubQuery() throws Exception {
+ String query = "select n_name from cp.`tpch/nation.parquet` " +
+ "where n_nationkey = " +
+ "(select r_regionkey from cp.`tpch/region.parquet` " +
+ "where r_regionkey = 1)";
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]{"agg.*SINGLE_VALUE", "Filter.*=\\(\\$0, 1\\)"});
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("n_name")
+ .baselineValues("ARGENTINA")
+ .go();
+ }
+
+ @Test
+ public void testMultipleComparisonWithSingleValueSubQuery() throws Exception {
+ String query = "select a.last_name, b.n_name " +
+ "from cp.`employee.json` a, cp.`tpch/nation.parquet` b " +
+ "where b.n_nationkey = " +
+ "(select r_regionkey from cp.`tpch/region.parquet` " +
+ "where r_regionkey = 1) limit 1";
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("last_name", "n_name")
+ .baselineValues("Nowmer", "ARGENTINA")
+ .go();
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java
index 54f7250..fd97564 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java
@@ -138,7 +138,6 @@ public class TestTpchDistributed extends BaseTestQuery {
}
@Test
- @Ignore
public void tpch21() throws Exception{
testDistributed("queries/tpch/21.sql");
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedStreaming.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedStreaming.java
index 56287c9..ed242be 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedStreaming.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedStreaming.java
@@ -145,7 +145,6 @@ public class TestTpchDistributedStreaming extends BaseTestQuery {
}
@Test
- @Ignore
public void tpch21() throws Exception{
testDistributed("queries/tpch/21.sql");
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java
index 3cbe5ef..1aa9de2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java
@@ -148,7 +148,6 @@ public class TestTpchExplain extends BaseTestQuery {
}
@Test
- @Ignore
public void tpch21() throws Exception{
doExplain("queries/tpch/21.sql");
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
index 9400c7c..51aea18 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
@@ -140,7 +140,6 @@ public class TestTpchLimit0 extends BaseTestQuery {
}
@Test
- @Ignore
public void tpch21() throws Exception{
testLimitZero("queries/tpch/21.sql");
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchPlanning.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchPlanning.java
index 1745301..f995ce6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchPlanning.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchPlanning.java
@@ -141,7 +141,6 @@ public class TestTpchPlanning extends PlanningBase {
}
@Test
- @Ignore // DRILL-519
public void tpch21() throws Exception {
testSqlPlanFromFile("queries/tpch/21.sql");
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
index 2863bbb..a3e34c0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
@@ -139,7 +139,6 @@ public class TestTpchSingleMode extends BaseTestQuery {
}
@Test
- @Ignore
public void tpch21() throws Exception{
testSingleMode("queries/tpch/21.sql");
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
index 0a5b7cd..445b18e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
@@ -25,6 +25,9 @@ import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.util.Text;
import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.categories.OperatorTest;
import org.apache.drill.PlanTestBase;
@@ -37,23 +40,32 @@ import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.junit.BeforeClass;
import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.math.BigDecimal;
import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@Category({SqlFunctionTest.class, OperatorTest.class, PlannerTest.class})
public class TestAggregateFunctions extends BaseTestQuery {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
@BeforeClass
public static void setupFiles() {
dirTestWatcher.copyResourceToRoot(Paths.get("agg"));
@@ -521,6 +533,7 @@ public class TestAggregateFunctions extends BaseTestQuery {
.go();
}
+
@Test
public void minMaxEmptyNonNullableInput() throws Exception {
// test min and max functions on required type
@@ -567,6 +580,73 @@ public class TestAggregateFunctions extends BaseTestQuery {
}
}
+ @Test
+ public void testSingleValueFunction() throws Exception {
+ List<String> tableNames = ImmutableList.of(
+ "cp.`parquet/alltypes_required.parquet`",
+ "cp.`parquet/alltypes_optional.parquet`");
+ for (String tableName : tableNames) {
+ final QueryDataBatch result =
+ testSqlWithResults(String.format("select * from %s limit 1", tableName)).get(0);
+
+ final Map<String, StringBuilder> functions = new HashMap<>();
+ functions.put("single_value", new StringBuilder());
+
+ final Map<String, Object> resultingValues = new HashMap<>();
+ final List<String> columns = new ArrayList<>();
+
+ final RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
+ loader.load(result.getHeader().getDef(), result.getData());
+
+ for (VectorWrapper<?> vectorWrapper : loader.getContainer()) {
+ final String fieldName = vectorWrapper.getField().getName();
+ Object object = vectorWrapper.getValueVector().getAccessor().getObject(0);
+ // VarCharVector returns Text instance, but baseline values should contain String value
+ if (object instanceof Text) {
+ object = object.toString();
+ }
+ resultingValues.put(String.format("`%s`", fieldName), object);
+ for (Map.Entry<String, StringBuilder> function : functions.entrySet()) {
+ function.getValue()
+ .append(function.getKey())
+ .append("(")
+ .append(fieldName)
+ .append(") ")
+ .append(fieldName)
+ .append(",");
+ }
+ columns.add(fieldName);
+ }
+ loader.clear();
+ result.release();
+
+ String columnsList = columns.stream()
+ .collect(Collectors.joining(", "));
+
+ final List<Map<String, Object>> baselineRecords = new ArrayList<>();
+ baselineRecords.add(resultingValues);
+
+ for (StringBuilder selectBody : functions.values()) {
+ selectBody.setLength(selectBody.length() - 1);
+
+ testBuilder()
+ .sqlQuery("select %s from (select %s from %s limit 1)",
+ selectBody.toString(), columnsList, tableName)
+ .unOrdered()
+ .baselineRecords(baselineRecords)
+ .go();
+ }
+ }
+ }
+
+ @Test
+ public void testSingleValueWithMultipleValuesInput() throws Exception {
+ thrown.expect(UserRemoteException.class);
+ thrown.expectMessage(containsString("FUNCTION ERROR"));
+ thrown.expectMessage(containsString("Input for single_value function has more than one row"));
+ test("select single_value(n_name) from cp.`tpch/nation.parquet`");
+ }
+
/*
* Streaming agg on top of a filter produces wrong results if the first two batches are filtered out.
* In the below test we have three files in the input directory and since the ordering of reading
diff --git a/pom.xml b/pom.xml
index 242b134..f69288e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,7 +45,7 @@
<dep.guava.version>18.0</dep.guava.version>
<forkCount>2</forkCount>
<parquet.version>1.10.0</parquet.version>
- <calcite.version>1.16.0-drill-r3</calcite.version>
+ <calcite.version>1.16.0-drill-r4</calcite.version>
<avatica.version>1.11.0</avatica.version>
<janino.version>2.7.6</janino.version>
<sqlline.version>1.1.9-drill-r7</sqlline.version>