You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/04/07 22:36:14 UTC

[3/7] incubator-calcite git commit: [CALCITE-366] Support Aggregate push down in bushy joins (Jesus Camacho Rodriguez)

[CALCITE-366] Support Aggregate push down in bushy joins (Jesus Camacho Rodriguez)

Fix up (Julian Hyde):
* replace pointer comparison with RexNode.isAlwaysTrue;
* add to the list of built-in rules, and add SQL test.

Close apache/incubator-calcite#75


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/e48c7627
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/e48c7627
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/e48c7627

Branch: refs/heads/r1.2
Commit: e48c7627c162726f42e786a0e0b8c3042315491a
Parents: 5f57f78
Author: Jesus Camacho Rodriguez <jc...@hortonworks.com>
Authored: Tue Apr 7 15:35:01 2015 +0100
Committer: Julian Hyde <jh...@apache.org>
Committed: Tue Apr 7 09:41:53 2015 -0700

----------------------------------------------------------------------
 .../calcite/plan/volcano/VolcanoPlanner.java    |   2 +
 .../rel/rules/AggregateJoinTransposeRule.java   | 131 +++++++++++++++++++
 .../apache/calcite/test/RelOptRulesTest.java    |  63 +++++++++
 .../org/apache/calcite/test/RelOptRulesTest.xml | 120 +++++++++++++++++
 core/src/test/resources/sql/join.oq             |  24 ++++
 5 files changed, 340 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/e48c7627/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java b/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java
index 06bff19..42d1b56 100644
--- a/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java
@@ -50,6 +50,7 @@ import org.apache.calcite.rel.convert.ConverterRule;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.rules.AggregateJoinTransposeRule;
 import org.apache.calcite.rel.rules.AggregateRemoveRule;
 import org.apache.calcite.rel.rules.CalcRemoveRule;
 import org.apache.calcite.rel.rules.FilterJoinRule;
@@ -1068,6 +1069,7 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
     addRule(AggregateRemoveRule.INSTANCE);
     addRule(UnionToDistinctRule.INSTANCE);
     addRule(ProjectRemoveRule.INSTANCE);
+    addRule(AggregateJoinTransposeRule.INSTANCE);
     addRule(CalcRemoveRule.INSTANCE);
     addRule(SortRemoveRule.INSTANCE);
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/e48c7627/core/src/main/java/org/apache/calcite/rel/rules/AggregateJoinTransposeRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/AggregateJoinTransposeRule.java b/core/src/main/java/org/apache/calcite/rel/rules/AggregateJoinTransposeRule.java
new file mode 100644
index 0000000..3d310a5
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/rules/AggregateJoinTransposeRule.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.mapping.Mappings;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Planner rule that pushes an
+ * {@link org.apache.calcite.rel.core.Aggregate}
+ * past a {@link org.apache.calcite.rel.core.Join}.
+ */
+public class AggregateJoinTransposeRule extends RelOptRule {
+  public static final AggregateJoinTransposeRule INSTANCE =
+      new AggregateJoinTransposeRule(LogicalAggregate.class,
+          RelFactories.DEFAULT_AGGREGATE_FACTORY,
+          LogicalJoin.class,
+          RelFactories.DEFAULT_JOIN_FACTORY);
+
+  private final RelFactories.AggregateFactory aggregateFactory;
+
+  private final RelFactories.JoinFactory joinFactory;
+
+  /** Creates an AggregateJoinTransposeRule. */
+  public AggregateJoinTransposeRule(Class<? extends Aggregate> aggregateClass,
+      RelFactories.AggregateFactory aggregateFactory,
+      Class<? extends Join> joinClass,
+      RelFactories.JoinFactory joinFactory) {
+    super(
+        operand(aggregateClass, null, Aggregate.IS_SIMPLE,
+            operand(joinClass, any())));
+    this.aggregateFactory = aggregateFactory;
+    this.joinFactory = joinFactory;
+  }
+
+  public void onMatch(RelOptRuleCall call) {
+    Aggregate aggregate = call.rel(0);
+    Join join = call.rel(1);
+
+    // If aggregate functions are present, we bail out
+    if (!aggregate.getAggCallList().isEmpty()) {
+      return;
+    }
+
+    // If it is not an inner join, we do not push the
+    // aggregate operator
+    if (join.getJoinType() != JoinRelType.INNER) {
+      return;
+    }
+
+    // Do the columns used by the join appear in the output of the aggregate?
+    final ImmutableBitSet aggregateColumns = aggregate.getGroupSet();
+    final ImmutableBitSet joinColumns =
+        RelOptUtil.InputFinder.bits(join.getCondition());
+    boolean allColumnsInAggregate = aggregateColumns.contains(joinColumns);
+    if (!allColumnsInAggregate) {
+      return;
+    }
+
+    // Split join condition
+    final List<Integer> leftKeys = Lists.newArrayList();
+    final List<Integer> rightKeys = Lists.newArrayList();
+    RexNode nonEquiConj =
+        RelOptUtil.splitJoinCondition(join.getLeft(), join.getRight(),
+            join.getCondition(), leftKeys, rightKeys);
+    // If it contains non-equi join conditions, we bail out
+    if (!nonEquiConj.isAlwaysTrue()) {
+      return;
+    }
+
+    // Create new aggregate operators below join
+    final ImmutableBitSet leftKeysBitSet = ImmutableBitSet.of(leftKeys);
+    RelNode newLeftInput = aggregateFactory.createAggregate(join.getLeft(),
+        false, leftKeysBitSet, null, aggregate.getAggCallList());
+    final ImmutableBitSet rightKeysBitSet = ImmutableBitSet.of(rightKeys);
+    RelNode newRightInput = aggregateFactory.createAggregate(join.getRight(),
+        false, rightKeysBitSet, null, aggregate.getAggCallList());
+
+    // Update condition
+    final Mappings.TargetMapping mapping = Mappings.target(
+        new Function<Integer, Integer>() {
+          public Integer apply(Integer a0) {
+            return aggregateColumns.indexOf(a0);
+          }
+        },
+        join.getRowType().getFieldCount(),
+        aggregateColumns.cardinality());
+    final RexNode newCondition =
+        RexUtil.apply(mapping, join.getCondition());
+
+    // Create new join
+    RelNode newJoin = joinFactory.createJoin(newLeftInput, newRightInput,
+        newCondition, join.getJoinType(),
+        join.getVariablesStopped(), join.isSemiJoinDone());
+
+    call.transformTo(newJoin);
+  }
+}
+
+// End AggregateJoinTransposeRule.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/e48c7627/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java b/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
index 5f9aa29..3663e4f 100644
--- a/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
+++ b/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.rules.AggregateExpandDistinctAggregatesRule;
 import org.apache.calcite.rel.rules.AggregateFilterTransposeRule;
+import org.apache.calcite.rel.rules.AggregateJoinTransposeRule;
 import org.apache.calcite.rel.rules.AggregateProjectMergeRule;
 import org.apache.calcite.rel.rules.AggregateProjectPullUpConstantsRule;
 import org.apache.calcite.rel.rules.AggregateReduceFunctionsRule;
@@ -1437,6 +1438,68 @@ public class RelOptRulesTest extends RelOptTestBase {
         + "  from emp) e1\n"
         + "where r < 2");
   }
+
+  @Test public void testPushAggregateThroughJoin1() throws Exception {
+    final HepProgram preProgram = new HepProgramBuilder()
+        .addRuleInstance(AggregateProjectMergeRule.INSTANCE)
+        .build();
+    final HepProgram program = new HepProgramBuilder()
+        .addRuleInstance(AggregateJoinTransposeRule.INSTANCE)
+        .build();
+    checkPlanning(tester, preProgram,
+        new HepPlanner(program),
+        "select e.empno,d.deptno \n"
+                + "from (select * from sales.emp where empno = 10) as e "
+                + "join sales.dept as d on e.empno = d.deptno "
+                + "group by e.empno,d.deptno");
+  }
+
+  @Test public void testPushAggregateThroughJoin2() throws Exception {
+    final HepProgram preProgram = new HepProgramBuilder()
+        .addRuleInstance(AggregateProjectMergeRule.INSTANCE)
+        .build();
+    final HepProgram program = new HepProgramBuilder()
+        .addRuleInstance(AggregateJoinTransposeRule.INSTANCE)
+        .build();
+    checkPlanning(tester, preProgram,
+        new HepPlanner(program),
+        "select e.empno,d.deptno \n"
+                + "from (select * from sales.emp where empno = 10) as e "
+                + "join sales.dept as d on e.empno = d.deptno "
+                + "and e.deptno + e.empno = d.deptno + 5 "
+                + "group by e.empno,d.deptno");
+  }
+
+  @Test public void testPushAggregateThroughJoin3() throws Exception {
+    final HepProgram preProgram = new HepProgramBuilder()
+        .addRuleInstance(AggregateProjectMergeRule.INSTANCE)
+        .build();
+    final HepProgram program = new HepProgramBuilder()
+        .addRuleInstance(AggregateJoinTransposeRule.INSTANCE)
+        .build();
+    checkPlanning(tester, preProgram,
+        new HepPlanner(program),
+        "select e.empno,d.deptno \n"
+                + "from (select * from sales.emp where empno = 10) as e "
+                + "join sales.dept as d on e.empno < d.deptno "
+                + "group by e.empno,d.deptno");
+  }
+
+  @Test public void testPushAggregateThroughJoin4() throws Exception {
+    final HepProgram preProgram = new HepProgramBuilder()
+        .addRuleInstance(AggregateProjectMergeRule.INSTANCE)
+        .build();
+    final HepProgram program = new HepProgramBuilder()
+        .addRuleInstance(AggregateJoinTransposeRule.INSTANCE)
+        .build();
+    checkPlanning(tester, preProgram,
+        new HepPlanner(program),
+        "select e.empno,sum(sal) \n"
+                + "from (select * from sales.emp where empno = 10) as e "
+                + "join sales.dept as d on e.empno = d.deptno "
+                + "group by e.empno,d.deptno");
+  }
+
 }
 
 // End RelOptRulesTest.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/e48c7627/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
----------------------------------------------------------------------
diff --git a/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml b/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
index 60bd03e..83154af 100644
--- a/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
@@ -3359,4 +3359,124 @@ LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
 ]]>
         </Resource>
     </TestCase>
+    <TestCase name="testPushAggregateThroughJoin1">
+        <Resource name="sql">
+            <![CDATA[select e.empno,d.deptno
+ from (select * from sales.emp where empno = 10) as e
+ join sales.dept as d on e.empno = d.deptno
+ group by e.empno,d.deptno]]>
+        </Resource>
+        <Resource name="planBefore">
+            <![CDATA[
+LogicalAggregate(group=[{0, 9}])
+  LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+    LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
+      LogicalFilter(condition=[=($0, 10)])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+        </Resource>
+        <Resource name="planAfter">
+            <![CDATA[
+LogicalJoin(condition=[=($0, $1)], joinType=[inner])
+  LogicalAggregate(group=[{0}])
+    LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
+      LogicalFilter(condition=[=($0, 10)])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+  LogicalAggregate(group=[{0}])
+    LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testPushAggregateThroughJoin2">
+        <Resource name="sql">
+            <![CDATA[select e.empno,d.deptno
+ from (select * from sales.emp where empno = 10) as e
+ join sales.dept as d on e.empno = d.deptno
+ and e.deptno + e.empno = d.deptno + 5
+ group by e.empno,d.deptno]]>
+        </Resource>
+        <Resource name="planBefore">
+            <![CDATA[
+LogicalAggregate(group=[{0, 10}])
+  LogicalJoin(condition=[AND(=($0, $10), =($9, $12))], joinType=[inner])
+    LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8], $f9=[+($7, $0)])
+      LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
+        LogicalFilter(condition=[=($0, 10)])
+          LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalProject(DEPTNO=[$0], NAME=[$1], $f2=[+($0, 5)])
+      LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+        </Resource>
+        <Resource name="planAfter">
+            <![CDATA[
+LogicalAggregate(group=[{0, 10}])
+  LogicalJoin(condition=[AND(=($0, $10), =($9, $12))], joinType=[inner])
+    LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8], $f9=[+($7, $0)])
+      LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
+        LogicalFilter(condition=[=($0, 10)])
+          LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalProject(DEPTNO=[$0], NAME=[$1], $f2=[+($0, 5)])
+      LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testPushAggregateThroughJoin3">
+        <Resource name="sql">
+            <![CDATA[select e.empno,d.deptno
+ from (select * from sales.emp where empno = 10) as e
+ join sales.dept as d on e.empno < d.deptno
+ group by e.empno,d.deptno]]>
+        </Resource>
+        <Resource name="planBefore">
+            <![CDATA[
+LogicalAggregate(group=[{0, 9}])
+  LogicalJoin(condition=[<($0, $9)], joinType=[inner])
+    LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
+      LogicalFilter(condition=[=($0, 10)])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+        </Resource>
+        <Resource name="planAfter">
+            <![CDATA[
+LogicalAggregate(group=[{0, 9}])
+  LogicalJoin(condition=[<($0, $9)], joinType=[inner])
+    LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
+      LogicalFilter(condition=[=($0, 10)])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testPushAggregateThroughJoin4">
+        <Resource name="sql">
+            <![CDATA[select e.empno,sum(sal)
+ from (select * from sales.emp where empno = 10) as e
+ join sales.dept as d on e.empno = d.deptno
+ group by e.empno,d.deptno]]>
+        </Resource>
+        <Resource name="planBefore">
+            <![CDATA[
+LogicalProject(EMPNO=[$0], EXPR$1=[$2])
+  LogicalAggregate(group=[{0, 9}], EXPR$1=[SUM($5)])
+    LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+      LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
+        LogicalFilter(condition=[=($0, 10)])
+          LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+        </Resource>
+        <Resource name="planAfter">
+            <![CDATA[
+LogicalProject(EMPNO=[$0], EXPR$1=[$2])
+  LogicalAggregate(group=[{0, 9}], EXPR$1=[SUM($5)])
+    LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+      LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
+        LogicalFilter(condition=[=($0, 10)])
+          LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+        </Resource>
+    </TestCase>
 </Root>

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/e48c7627/core/src/test/resources/sql/join.oq
----------------------------------------------------------------------
diff --git a/core/src/test/resources/sql/join.oq b/core/src/test/resources/sql/join.oq
index 1bbb48c..d354ead 100644
--- a/core/src/test/resources/sql/join.oq
+++ b/core/src/test/resources/sql/join.oq
@@ -110,4 +110,28 @@ EnumerableCalc(expr#0..5=[{inputs}], proj#0..2=[{exprs}], DEPTNO0=[$t4], DNAME=[
     EnumerableValues(tuples=[[{ 10, 'Sales      ' }, { 20, 'Marketing  ' }, { 30, 'Engineering' }, { 40, 'Empty      ' }]])
 !plan
 
+!use scott
+
+# Push aggregate through join
+select distinct dept.deptno, emp.deptno
+from "scott".emp join "scott".dept using (deptno);
++--------+--------+
+| DEPTNO | DEPTNO |
++--------+--------+
+|     10 |     10 |
+|     20 |     20 |
+|     30 |     30 |
++--------+--------+
+(3 rows)
+
+!ok
+EnumerableJoin(condition=[=($0, $1)], joinType=[inner])
+  EnumerableAggregate(group=[{0}])
+    EnumerableCalc(expr#0..2=[{inputs}], DEPTNO=[$t0])
+      EnumerableTableScan(table=[[scott, DEPT]])
+  EnumerableAggregate(group=[{0}])
+    EnumerableCalc(expr#0..7=[{inputs}], DEPTNO=[$t7])
+      EnumerableTableScan(table=[[scott, EMP]])
+!plan
+
 # End join.oq