You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/04/07 22:36:14 UTC
[3/7] incubator-calcite git commit: [CALCITE-366] Support Aggregate
push down in bushy joins (Jesus Camacho Rodriguez)
[CALCITE-366] Support Aggregate push down in bushy joins (Jesus Camacho Rodriguez)
Fix up (Julian Hyde):
* replace pointer comparison with RexNode.isAlwaysTrue;
* add to the list of built-in rules, and add SQL test.
Close apache/incubator-calcite#75
Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/e48c7627
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/e48c7627
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/e48c7627
Branch: refs/heads/r1.2
Commit: e48c7627c162726f42e786a0e0b8c3042315491a
Parents: 5f57f78
Author: Jesus Camacho Rodriguez <jc...@hortonworks.com>
Authored: Tue Apr 7 15:35:01 2015 +0100
Committer: Julian Hyde <jh...@apache.org>
Committed: Tue Apr 7 09:41:53 2015 -0700
----------------------------------------------------------------------
.../calcite/plan/volcano/VolcanoPlanner.java | 2 +
.../rel/rules/AggregateJoinTransposeRule.java | 131 +++++++++++++++++++
.../apache/calcite/test/RelOptRulesTest.java | 63 +++++++++
.../org/apache/calcite/test/RelOptRulesTest.xml | 120 +++++++++++++++++
core/src/test/resources/sql/join.oq | 24 ++++
5 files changed, 340 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/e48c7627/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java b/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java
index 06bff19..42d1b56 100644
--- a/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java
@@ -50,6 +50,7 @@ import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.rules.AggregateJoinTransposeRule;
import org.apache.calcite.rel.rules.AggregateRemoveRule;
import org.apache.calcite.rel.rules.CalcRemoveRule;
import org.apache.calcite.rel.rules.FilterJoinRule;
@@ -1068,6 +1069,7 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
addRule(AggregateRemoveRule.INSTANCE);
addRule(UnionToDistinctRule.INSTANCE);
addRule(ProjectRemoveRule.INSTANCE);
+ addRule(AggregateJoinTransposeRule.INSTANCE);
addRule(CalcRemoveRule.INSTANCE);
addRule(SortRemoveRule.INSTANCE);
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/e48c7627/core/src/main/java/org/apache/calcite/rel/rules/AggregateJoinTransposeRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/AggregateJoinTransposeRule.java b/core/src/main/java/org/apache/calcite/rel/rules/AggregateJoinTransposeRule.java
new file mode 100644
index 0000000..3d310a5
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/rules/AggregateJoinTransposeRule.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.mapping.Mappings;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Planner rule that pushes an
+ * {@link org.apache.calcite.rel.core.Aggregate}
+ * past a {@link org.apache.calcite.rel.core.Join}.
+ */
+public class AggregateJoinTransposeRule extends RelOptRule {
+ public static final AggregateJoinTransposeRule INSTANCE =
+ new AggregateJoinTransposeRule(LogicalAggregate.class,
+ RelFactories.DEFAULT_AGGREGATE_FACTORY,
+ LogicalJoin.class,
+ RelFactories.DEFAULT_JOIN_FACTORY);
+
+ private final RelFactories.AggregateFactory aggregateFactory;
+
+ private final RelFactories.JoinFactory joinFactory;
+
+ /** Creates an AggregateJoinTransposeRule. */
+ public AggregateJoinTransposeRule(Class<? extends Aggregate> aggregateClass,
+ RelFactories.AggregateFactory aggregateFactory,
+ Class<? extends Join> joinClass,
+ RelFactories.JoinFactory joinFactory) {
+ super(
+ operand(aggregateClass, null, Aggregate.IS_SIMPLE,
+ operand(joinClass, any())));
+ this.aggregateFactory = aggregateFactory;
+ this.joinFactory = joinFactory;
+ }
+
+ public void onMatch(RelOptRuleCall call) {
+ Aggregate aggregate = call.rel(0);
+ Join join = call.rel(1);
+
+ // If aggregate functions are present, we bail out
+ if (!aggregate.getAggCallList().isEmpty()) {
+ return;
+ }
+
+ // If it is not an inner join, we do not push the
+ // aggregate operator
+ if (join.getJoinType() != JoinRelType.INNER) {
+ return;
+ }
+
+ // Do the columns used by the join appear in the output of the aggregate?
+ final ImmutableBitSet aggregateColumns = aggregate.getGroupSet();
+ final ImmutableBitSet joinColumns =
+ RelOptUtil.InputFinder.bits(join.getCondition());
+ boolean allColumnsInAggregate = aggregateColumns.contains(joinColumns);
+ if (!allColumnsInAggregate) {
+ return;
+ }
+
+ // Split join condition
+ final List<Integer> leftKeys = Lists.newArrayList();
+ final List<Integer> rightKeys = Lists.newArrayList();
+ RexNode nonEquiConj =
+ RelOptUtil.splitJoinCondition(join.getLeft(), join.getRight(),
+ join.getCondition(), leftKeys, rightKeys);
+ // If it contains non-equi join conditions, we bail out
+ if (!nonEquiConj.isAlwaysTrue()) {
+ return;
+ }
+
+ // Create new aggregate operators below join
+ final ImmutableBitSet leftKeysBitSet = ImmutableBitSet.of(leftKeys);
+ RelNode newLeftInput = aggregateFactory.createAggregate(join.getLeft(),
+ false, leftKeysBitSet, null, aggregate.getAggCallList());
+ final ImmutableBitSet rightKeysBitSet = ImmutableBitSet.of(rightKeys);
+ RelNode newRightInput = aggregateFactory.createAggregate(join.getRight(),
+ false, rightKeysBitSet, null, aggregate.getAggCallList());
+
+ // Update condition
+ final Mappings.TargetMapping mapping = Mappings.target(
+ new Function<Integer, Integer>() {
+ public Integer apply(Integer a0) {
+ return aggregateColumns.indexOf(a0);
+ }
+ },
+ join.getRowType().getFieldCount(),
+ aggregateColumns.cardinality());
+ final RexNode newCondition =
+ RexUtil.apply(mapping, join.getCondition());
+
+ // Create new join
+ RelNode newJoin = joinFactory.createJoin(newLeftInput, newRightInput,
+ newCondition, join.getJoinType(),
+ join.getVariablesStopped(), join.isSemiJoinDone());
+
+ call.transformTo(newJoin);
+ }
+}
+
+// End AggregateJoinTransposeRule.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/e48c7627/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java b/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
index 5f9aa29..3663e4f 100644
--- a/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
+++ b/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
import org.apache.calcite.rel.rules.AggregateExpandDistinctAggregatesRule;
import org.apache.calcite.rel.rules.AggregateFilterTransposeRule;
+import org.apache.calcite.rel.rules.AggregateJoinTransposeRule;
import org.apache.calcite.rel.rules.AggregateProjectMergeRule;
import org.apache.calcite.rel.rules.AggregateProjectPullUpConstantsRule;
import org.apache.calcite.rel.rules.AggregateReduceFunctionsRule;
@@ -1437,6 +1438,68 @@ public class RelOptRulesTest extends RelOptTestBase {
+ " from emp) e1\n"
+ "where r < 2");
}
+
+ @Test public void testPushAggregateThroughJoin1() throws Exception {
+ final HepProgram preProgram = new HepProgramBuilder()
+ .addRuleInstance(AggregateProjectMergeRule.INSTANCE)
+ .build();
+ final HepProgram program = new HepProgramBuilder()
+ .addRuleInstance(AggregateJoinTransposeRule.INSTANCE)
+ .build();
+ checkPlanning(tester, preProgram,
+ new HepPlanner(program),
+ "select e.empno,d.deptno \n"
+ + "from (select * from sales.emp where empno = 10) as e "
+ + "join sales.dept as d on e.empno = d.deptno "
+ + "group by e.empno,d.deptno");
+ }
+
+ @Test public void testPushAggregateThroughJoin2() throws Exception {
+ final HepProgram preProgram = new HepProgramBuilder()
+ .addRuleInstance(AggregateProjectMergeRule.INSTANCE)
+ .build();
+ final HepProgram program = new HepProgramBuilder()
+ .addRuleInstance(AggregateJoinTransposeRule.INSTANCE)
+ .build();
+ checkPlanning(tester, preProgram,
+ new HepPlanner(program),
+ "select e.empno,d.deptno \n"
+ + "from (select * from sales.emp where empno = 10) as e "
+ + "join sales.dept as d on e.empno = d.deptno "
+ + "and e.deptno + e.empno = d.deptno + 5 "
+ + "group by e.empno,d.deptno");
+ }
+
+ @Test public void testPushAggregateThroughJoin3() throws Exception {
+ final HepProgram preProgram = new HepProgramBuilder()
+ .addRuleInstance(AggregateProjectMergeRule.INSTANCE)
+ .build();
+ final HepProgram program = new HepProgramBuilder()
+ .addRuleInstance(AggregateJoinTransposeRule.INSTANCE)
+ .build();
+ checkPlanning(tester, preProgram,
+ new HepPlanner(program),
+ "select e.empno,d.deptno \n"
+ + "from (select * from sales.emp where empno = 10) as e "
+ + "join sales.dept as d on e.empno < d.deptno "
+ + "group by e.empno,d.deptno");
+ }
+
+ @Test public void testPushAggregateThroughJoin4() throws Exception {
+ final HepProgram preProgram = new HepProgramBuilder()
+ .addRuleInstance(AggregateProjectMergeRule.INSTANCE)
+ .build();
+ final HepProgram program = new HepProgramBuilder()
+ .addRuleInstance(AggregateJoinTransposeRule.INSTANCE)
+ .build();
+ checkPlanning(tester, preProgram,
+ new HepPlanner(program),
+ "select e.empno,sum(sal) \n"
+ + "from (select * from sales.emp where empno = 10) as e "
+ + "join sales.dept as d on e.empno = d.deptno "
+ + "group by e.empno,d.deptno");
+ }
+
}
// End RelOptRulesTest.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/e48c7627/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
----------------------------------------------------------------------
diff --git a/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml b/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
index 60bd03e..83154af 100644
--- a/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
@@ -3359,4 +3359,124 @@ LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
]]>
</Resource>
</TestCase>
+ <TestCase name="testPushAggregateThroughJoin1">
+ <Resource name="sql">
+ <![CDATA[select e.empno,d.deptno
+ from (select * from sales.emp where empno = 10) as e
+ join sales.dept as d on e.empno = d.deptno
+ group by e.empno,d.deptno]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0, 9}])
+ LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+ LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
+ LogicalFilter(condition=[=($0, 10)])
+ LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+ LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalJoin(condition=[=($0, $1)], joinType=[inner])
+ LogicalAggregate(group=[{0}])
+ LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
+ LogicalFilter(condition=[=($0, 10)])
+ LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+ LogicalAggregate(group=[{0}])
+ LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPushAggregateThroughJoin2">
+ <Resource name="sql">
+ <![CDATA[select e.empno,d.deptno
+ from (select * from sales.emp where empno = 10) as e
+ join sales.dept as d on e.empno = d.deptno
+ and e.deptno + e.empno = d.deptno + 5
+ group by e.empno,d.deptno]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0, 10}])
+ LogicalJoin(condition=[AND(=($0, $10), =($9, $12))], joinType=[inner])
+ LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8], $f9=[+($7, $0)])
+ LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
+ LogicalFilter(condition=[=($0, 10)])
+ LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+ LogicalProject(DEPTNO=[$0], NAME=[$1], $f2=[+($0, 5)])
+ LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalAggregate(group=[{0, 10}])
+ LogicalJoin(condition=[AND(=($0, $10), =($9, $12))], joinType=[inner])
+ LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8], $f9=[+($7, $0)])
+ LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
+ LogicalFilter(condition=[=($0, 10)])
+ LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+ LogicalProject(DEPTNO=[$0], NAME=[$1], $f2=[+($0, 5)])
+ LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPushAggregateThroughJoin3">
+ <Resource name="sql">
+ <![CDATA[select e.empno,d.deptno
+ from (select * from sales.emp where empno = 10) as e
+ join sales.dept as d on e.empno < d.deptno
+ group by e.empno,d.deptno]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0, 9}])
+ LogicalJoin(condition=[<($0, $9)], joinType=[inner])
+ LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
+ LogicalFilter(condition=[=($0, 10)])
+ LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+ LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalAggregate(group=[{0, 9}])
+ LogicalJoin(condition=[<($0, $9)], joinType=[inner])
+ LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
+ LogicalFilter(condition=[=($0, 10)])
+ LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+ LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPushAggregateThroughJoin4">
+ <Resource name="sql">
+ <![CDATA[select e.empno,sum(sal)
+ from (select * from sales.emp where empno = 10) as e
+ join sales.dept as d on e.empno = d.deptno
+ group by e.empno,d.deptno]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EMPNO=[$0], EXPR$1=[$2])
+ LogicalAggregate(group=[{0, 9}], EXPR$1=[SUM($5)])
+ LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+ LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
+ LogicalFilter(condition=[=($0, 10)])
+ LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+ LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(EMPNO=[$0], EXPR$1=[$2])
+ LogicalAggregate(group=[{0, 9}], EXPR$1=[SUM($5)])
+ LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+ LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
+ LogicalFilter(condition=[=($0, 10)])
+ LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+ LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+ </Resource>
+ </TestCase>
</Root>
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/e48c7627/core/src/test/resources/sql/join.oq
----------------------------------------------------------------------
diff --git a/core/src/test/resources/sql/join.oq b/core/src/test/resources/sql/join.oq
index 1bbb48c..d354ead 100644
--- a/core/src/test/resources/sql/join.oq
+++ b/core/src/test/resources/sql/join.oq
@@ -110,4 +110,28 @@ EnumerableCalc(expr#0..5=[{inputs}], proj#0..2=[{exprs}], DEPTNO0=[$t4], DNAME=[
EnumerableValues(tuples=[[{ 10, 'Sales ' }, { 20, 'Marketing ' }, { 30, 'Engineering' }, { 40, 'Empty ' }]])
!plan
+!use scott
+
+# Push aggregate through join
+select distinct dept.deptno, emp.deptno
+from "scott".emp join "scott".dept using (deptno);
++--------+--------+
+| DEPTNO | DEPTNO |
++--------+--------+
+| 10 | 10 |
+| 20 | 20 |
+| 30 | 30 |
++--------+--------+
+(3 rows)
+
+!ok
+EnumerableJoin(condition=[=($0, $1)], joinType=[inner])
+ EnumerableAggregate(group=[{0}])
+ EnumerableCalc(expr#0..2=[{inputs}], DEPTNO=[$t0])
+ EnumerableTableScan(table=[[scott, DEPT]])
+ EnumerableAggregate(group=[{0}])
+ EnumerableCalc(expr#0..7=[{inputs}], DEPTNO=[$t7])
+ EnumerableTableScan(table=[[scott, EMP]])
+!plan
+
# End join.oq