You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/09/04 09:35:03 UTC

[05/10] incubator-calcite git commit: [CALCITE-841] Redundant windows when window function arguments are expressions (Hsuan-Yi Chu)

[CALCITE-841] Redundant windows when window function arguments are expressions (Hsuan-Yi Chu)

In ProjectToWindowRule, RexOver will be put in the same cohort if the
following conditions are satisfied:
(1). They have the same RexWindow
(2). They are not dependent on each other

Close apache/incubator-calcite#124


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/c1fb8299
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/c1fb8299
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/c1fb8299

Branch: refs/heads/master
Commit: c1fb82996b702397a2506970a31f3a6b4c7a956a
Parents: d69f2c2
Author: Hsuan-Yi Chu <hs...@usc.edu>
Authored: Mon Aug 24 18:24:59 2015 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Thu Sep 3 16:08:46 2015 -0700

----------------------------------------------------------------------
 .../calcite/rel/rules/ProjectToWindowRule.java  | 136 ++++++++++++++++++-
 .../apache/calcite/test/RelOptRulesTest.java    |  19 +++
 .../org/apache/calcite/test/RelOptRulesTest.xml |  23 ++++
 3 files changed, 172 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/c1fb8299/core/src/main/java/org/apache/calcite/rel/rules/ProjectToWindowRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/ProjectToWindowRule.java b/core/src/main/java/org/apache/calcite/rel/rules/ProjectToWindowRule.java
index 3719253..92fd282 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/ProjectToWindowRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/ProjectToWindowRule.java
@@ -16,6 +16,7 @@
  */
 package org.apache.calcite.rel.rules;
 
+import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
@@ -37,13 +38,25 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexOver;
 import org.apache.calcite.rex.RexProgram;
 import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.rex.RexWindow;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
+import org.apache.calcite.util.graph.DefaultDirectedGraph;
+import org.apache.calcite.util.graph.DefaultEdge;
+import org.apache.calcite.util.graph.DirectedGraph;
+import org.apache.calcite.util.graph.TopologicalOrderIterator;
 
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
-import java.util.Collections;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -239,11 +252,122 @@ public abstract class ProjectToWindowRule extends RelOptRule {
     }
 
     @Override protected List<Set<Integer>> getCohorts() {
-      // Here used to be the implementation that treats all the RexOvers
-      // as a single Cohort. This is flawed if the RexOvers
-      // depend on each other (i.e. the second one uses the result
-      // of the first).
-      return Collections.emptyList();
+      // Two RexOver will be put in the same cohort
+      // if the following conditions are satisfied
+      // (1). They have the same RexWindow
+      // (2). They are not dependent on each other
+      final List<RexNode> exprs = this.program.getExprList();
+      final DirectedGraph<Integer, DefaultEdge> graph =
+          createGraphFromExpression(exprs);
+      final List<Integer> rank = getRank(graph);
+
+      final List<Pair<RexWindow, Set<Integer>>> windowToIndices = new ArrayList<>();
+      for (int i = 0; i < exprs.size(); ++i) {
+        final RexNode expr = exprs.get(i);
+        if (expr instanceof RexOver) {
+          final RexOver over = (RexOver) expr;
+
+          // If we can found an existing cohort which satisfies the two conditions,
+          // we will add this RexOver into that cohort
+          boolean isFound = false;
+          for (Pair<RexWindow, Set<Integer>> pair : windowToIndices) {
+            // Check the first condition
+            if (pair.left.equals(over.getWindow())) {
+              // Check the second condition
+              boolean hasDependency = false;
+              for (int ordinal : pair.right) {
+                if (isDependent(graph, rank, ordinal, i)) {
+                  hasDependency = true;
+                  break;
+                }
+              }
+
+              if (!hasDependency) {
+                pair.right.add(i);
+                isFound = true;
+                break;
+              }
+            }
+          }
+
+          // This RexOver cannot be added into any existing cohort
+          if (!isFound) {
+            final Set<Integer> newSet = Sets.newHashSet(i);
+            windowToIndices.add(Pair.of(over.getWindow(), newSet));
+          }
+        }
+      }
+
+      final List<Set<Integer>> cohorts = new ArrayList<>();
+      for (Pair<RexWindow, Set<Integer>> pair : windowToIndices) {
+        cohorts.add(pair.right);
+      }
+      return cohorts;
+    }
+
+    private boolean isDependent(final DirectedGraph<Integer, DefaultEdge> graph,
+        final List<Integer> rank,
+        final int ordinal1,
+        final int ordinal2) {
+      if (rank.get(ordinal2) > rank.get(ordinal1)) {
+        return isDependent(graph, rank, ordinal2, ordinal1);
+      }
+
+      // Check if the expression in ordinal1
+      // could depend on expression in ordinal2 by Depth-First-Search
+      final Deque<Integer> dfs = new ArrayDeque<>();
+      final Set<Integer> visited = new HashSet<>();
+      dfs.push(ordinal2);
+      while (!dfs.isEmpty()) {
+        int source = dfs.pop();
+        if (visited.contains(source)) {
+          continue;
+        }
+
+        if (source == ordinal1) {
+          return true;
+        }
+
+        visited.add(source);
+        for (DefaultEdge e : graph.getOutwardEdges(source)) {
+          int target = (int) e.target;
+          if (rank.get(target) < rank.get(ordinal1)) {
+            dfs.push(target);
+          }
+        }
+      }
+
+      return false;
+    }
+
+    private List<Integer> getRank(DirectedGraph<Integer, DefaultEdge> graph) {
+      final int[] rankArr = new int[graph.vertexSet().size()];
+      int rank = 0;
+      for (int i : TopologicalOrderIterator.of(graph)) {
+        rankArr[i] = rank++;
+      }
+      return ImmutableIntList.of(rankArr);
+    }
+
+    private DirectedGraph<Integer, DefaultEdge> createGraphFromExpression(
+        final List<RexNode> exprs) {
+      final DirectedGraph<Integer, DefaultEdge> graph =
+          DefaultDirectedGraph.create();
+      for (int i = 0; i < exprs.size(); i++) {
+        graph.addVertex(i);
+      }
+
+      for (final Ord<RexNode> expr : Ord.zip(exprs)) {
+        expr.e.accept(
+            new RexVisitorImpl<Void>(true) {
+              public Void visitLocalRef(RexLocalRef localRef) {
+                graph.addEdge(localRef.getIndex(), expr.i);
+                return null;
+              }
+            });
+      }
+      assert graph.vertexSet().size() == exprs.size();
+      return graph;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/c1fb8299/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java b/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
index fc6d5bb..bdea4af 100644
--- a/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
+++ b/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
@@ -1632,6 +1632,25 @@ public class RelOptRulesTest extends RelOptTestBase {
         + "where r < 2");
   }
 
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-841">[CALCITE-841],
+   * Redundant windows when window function arguments are expressions</a>. */
+  @Test public void testExpressionInWindowFunction() {
+    HepProgram preProgram =  new HepProgramBuilder().build();
+
+    HepProgramBuilder builder = new HepProgramBuilder();
+    builder.addRuleClass(ProjectToWindowRule.class);
+
+    HepPlanner hepPlanner = new HepPlanner(builder.build());
+    hepPlanner.addRule(ProjectToWindowRule.PROJECT);
+
+    final String sql = "select\n"
+        + " sum(deptno) over(partition by deptno order by sal) as sum1,\n"
+        + "sum(deptno + sal) over(partition by deptno order by sal) as sum2\n"
+        + "from emp";
+    checkPlanning(tester, preProgram, hepPlanner, sql);
+  }
+
   @Test public void testPushAggregateThroughJoin1() throws Exception {
     final HepProgram preProgram = new HepProgramBuilder()
         .addRuleInstance(AggregateProjectMergeRule.INSTANCE)

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/c1fb8299/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
----------------------------------------------------------------------
diff --git a/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml b/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
index 199fb6b..4ab7a81 100644
--- a/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
@@ -3424,6 +3424,29 @@ LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
 ]]>
         </Resource>
     </TestCase>
+    <TestCase name="testExpressionInWindowFunction">
+        <Resource name="sql">
+            <![CDATA[
+select sum(deptno) over(partition by deptno order by sal) as sum1,
+sum(deptno + sal) over(partition by deptno order by sal) as sum2
+from emp
+]]>
+        </Resource>
+        <Resource name="planAfter">
+            <![CDATA[
+LogicalProject($0=[$3], $1=[$4])
+  LogicalWindow(window#0=[window(partition {1} order by [0] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [SUM($1), SUM($2)])])
+    LogicalProject(SAL=[$5], DEPTNO=[$7], $2=[+($7, $5)])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+        </Resource>
+        <Resource name="planBefore">
+            <![CDATA[
+LogicalProject(SUM1=[SUM($7) OVER (PARTITION BY $7 ORDER BY $5 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], SUM2=[SUM(+($7, $5)) OVER (PARTITION BY $7 ORDER BY $5 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+  LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+        </Resource>
+    </TestCase>
     <TestCase name="testPushAggregateThroughJoin1">
         <Resource name="sql">
             <![CDATA[select e.empno,d.deptno