You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2017/04/24 22:53:14 UTC

calcite git commit: [CALCITE-476] DISTINCT flag in windowed aggregates (Radu Tudoran)

Repository: calcite
Updated Branches:
  refs/heads/master 495fd1dc7 -> aed5bca7c


[CALCITE-476] DISTINCT flag in windowed aggregates (Radu Tudoran)

Adds support in parser, SqlToRelConverter, and RexOver;
but the runtime (EnumerableWindow) still ignores the flag.

Close apache/calcite#428


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/aed5bca7
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/aed5bca7
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/aed5bca7

Branch: refs/heads/master
Commit: aed5bca7c5f49d117d7b794395ff4b6ad2d76bde
Parents: 495fd1d
Author: rtudoran <tu...@ymail.com>
Authored: Thu Apr 13 20:58:24 2017 +0200
Committer: Julian Hyde <jh...@apache.org>
Committed: Mon Apr 24 13:33:54 2017 -0700

----------------------------------------------------------------------
 .../org/apache/calcite/rel/core/Window.java     | 10 ++++--
 .../calcite/rel/logical/LogicalWindow.java      |  3 +-
 .../rel/rules/ProjectWindowTransposeRule.java   |  3 +-
 .../java/org/apache/calcite/rex/RexBuilder.java | 13 ++++---
 .../java/org/apache/calcite/rex/RexOver.java    | 37 +++++++++++++++++---
 .../java/org/apache/calcite/rex/RexShuttle.java |  3 +-
 .../calcite/sql2rel/SqlToRelConverter.java      | 25 ++++++++++---
 .../apache/calcite/test/RelOptRulesTest.java    |  2 ++
 .../calcite/test/SqlToRelConverterTest.java     | 25 +++++++++++++
 .../calcite/test/SqlToRelConverterTest.xml      | 33 +++++++++++++++++
 10 files changed, 135 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/aed5bca7/core/src/main/java/org/apache/calcite/rel/core/Window.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/core/Window.java b/core/src/main/java/org/apache/calcite/rel/core/Window.java
index 462f5cb..b9d92ac 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/Window.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/Window.java
@@ -314,7 +314,7 @@ public abstract class Window extends SingleRel {
         public AggregateCall get(int index) {
           final RexWinAggCall aggCall = aggCalls.get(index);
           return AggregateCall.create((SqlAggFunction) aggCall.getOperator(),
-              false, getProjectOrdinals(aggCall.getOperands()), -1,
+              aggCall.distinct, getProjectOrdinals(aggCall.getOperands()), -1,
               aggCall.getType(), fieldNames.get(aggCall.ordinal));
         }
       };
@@ -336,6 +336,9 @@ public abstract class Window extends SingleRel {
      */
     public final int ordinal;
 
+   /** Whether to eliminate duplicates before applying aggregate function. */
+    public final boolean distinct;
+
     /**
      * Creates a RexWinAggCall.
      *
@@ -343,14 +346,17 @@ public abstract class Window extends SingleRel {
      * @param type     Result type
      * @param operands Operands to call
      * @param ordinal  Ordinal within its partition
+     * @param distinct Eliminate duplicates before applying aggregate function
      */
     public RexWinAggCall(
         SqlAggFunction aggFun,
         RelDataType type,
         List<RexNode> operands,
-        int ordinal) {
+        int ordinal,
+        boolean distinct) {
       super(type, aggFun, operands);
       this.ordinal = ordinal;
+      this.distinct = distinct;
     }
 
     @Override public RexCall clone(RelDataType type, List<RexNode> operands) {

http://git-wip-us.apache.org/repos/asf/calcite/blob/aed5bca7/core/src/main/java/org/apache/calcite/rel/logical/LogicalWindow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/logical/LogicalWindow.java b/core/src/main/java/org/apache/calcite/rel/logical/LogicalWindow.java
index abd703a..1b6445e 100644
--- a/core/src/main/java/org/apache/calcite/rel/logical/LogicalWindow.java
+++ b/core/src/main/java/org/apache/calcite/rel/logical/LogicalWindow.java
@@ -156,7 +156,8 @@ public final class LogicalWindow extends Window {
                 over.getAggOperator(),
                 over.getType(),
                 toInputRefs(over.operands),
-                aggMap.size());
+                aggMap.size(),
+                over.isDistinct());
         aggCalls.add(aggCall);
         aggMap.put(over, aggCall);
       }

http://git-wip-us.apache.org/repos/asf/calcite/blob/aed5bca7/core/src/main/java/org/apache/calcite/rel/rules/ProjectWindowTransposeRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/ProjectWindowTransposeRule.java b/core/src/main/java/org/apache/calcite/rel/rules/ProjectWindowTransposeRule.java
index 98f22ea..0cf2949 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/ProjectWindowTransposeRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/ProjectWindowTransposeRule.java
@@ -111,7 +111,8 @@ public class ProjectWindowTransposeRule extends RelOptRule {
                 (SqlAggFunction) call.getOperator(),
                 call.getType(),
                 clonedOperands,
-                ((Window.RexWinAggCall) call).ordinal);
+                ((Window.RexWinAggCall) call).ordinal,
+                ((Window.RexWinAggCall) call).distinct);
           } else {
             return call;
           }

http://git-wip-us.apache.org/repos/asf/calcite/blob/aed5bca7/core/src/main/java/org/apache/calcite/rex/RexBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexBuilder.java b/core/src/main/java/org/apache/calcite/rex/RexBuilder.java
index eefb2d8..1906227 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexBuilder.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexBuilder.java
@@ -335,7 +335,8 @@ public class RexBuilder {
       RexWindowBound upperBound,
       boolean physical,
       boolean allowPartial,
-      boolean nullWhenCountZero) {
+      boolean nullWhenCountZero,
+      boolean distinct) {
     assert operator != null;
     assert exprs != null;
     assert partitionKeys != null;
@@ -347,7 +348,7 @@ public class RexBuilder {
             lowerBound,
             upperBound,
             physical);
-    final RexOver over = new RexOver(type, operator, exprs, window);
+    final RexOver over = new RexOver(type, operator, exprs, window, distinct);
     RexNode result = over;
 
     // This should be correct but need time to go over test results.
@@ -363,14 +364,15 @@ public class RexBuilder {
                   bigintType,
                   SqlStdOperatorTable.COUNT,
                   exprs,
-                  window),
+                  window,
+                  distinct),
               makeLiteral(
                   BigDecimal.ZERO,
                   bigintType,
                   SqlTypeName.DECIMAL)),
           ensureType(type, // SUM0 is non-nullable, thus need a cast
               new RexOver(typeFactory.createTypeWithNullability(type, false),
-              operator, exprs, window),
+              operator, exprs, window, distinct),
               false),
           makeCast(type, constantNull()));
     }
@@ -388,7 +390,8 @@ public class RexBuilder {
                       bigintType,
                       SqlStdOperatorTable.COUNT,
                       ImmutableList.<RexNode>of(),
-                      window),
+                      window,
+                      distinct),
                   makeLiteral(
                       BigDecimal.valueOf(2),
                       bigintType,

http://git-wip-us.apache.org/repos/asf/calcite/blob/aed5bca7/core/src/main/java/org/apache/calcite/rex/RexOver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexOver.java b/core/src/main/java/org/apache/calcite/rex/RexOver.java
index d2c4684..38d1f9c 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexOver.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexOver.java
@@ -35,13 +35,15 @@ public class RexOver extends RexCall {
   //~ Instance fields --------------------------------------------------------
 
   private final RexWindow window;
+  private final boolean distinct;
 
   //~ Constructors -----------------------------------------------------------
 
   /**
    * Creates a RexOver.
    *
-   * <p>For example, "SUM(x) OVER (ROWS 3 PRECEDING)" is represented as:
+   * <p>For example, "SUM(DISTINCT x) OVER (ROWS 3 PRECEDING)" is represented
+   * as:
    *
    * <ul>
    * <li>type = Integer,
@@ -54,15 +56,18 @@ public class RexOver extends RexCall {
    * @param op       Aggregate operator
    * @param operands Operands list
    * @param window   Window specification
+   * @param distinct Aggregate operator is applied on distinct elements
    */
   RexOver(
       RelDataType type,
       SqlAggFunction op,
       List<RexNode> operands,
-      RexWindow window) {
+      RexWindow window,
+      boolean distinct) {
     super(type, op, operands);
     Preconditions.checkArgument(op.isAggregator());
     this.window = Preconditions.checkNotNull(window);
+    this.distinct = distinct;
   }
 
   //~ Methods ----------------------------------------------------------------
@@ -78,8 +83,32 @@ public class RexOver extends RexCall {
     return window;
   }
 
-  protected String computeDigest(boolean withType) {
-    return super.computeDigest(withType) + " OVER (" + window + ")";
+  public boolean isDistinct() {
+    return distinct;
+  }
+
+  @Override protected String computeDigest(boolean withType) {
+    final StringBuilder sb = new StringBuilder(op.getName());
+    sb.append("(");
+    if (distinct) {
+      sb.append("DISTINCT ");
+    }
+    for (int i = 0; i < operands.size(); i++) {
+      if (i > 0) {
+        sb.append(", ");
+      }
+      RexNode operand = operands.get(i);
+      sb.append(operand.toString());
+    }
+    sb.append(")");
+    if (withType) {
+      sb.append(":");
+      sb.append(type.getFullTypeString());
+    }
+    sb.append(" OVER (")
+        .append(window)
+        .append(")");
+    return sb.toString();
   }
 
   public <R> R accept(RexVisitor<R> visitor) {

http://git-wip-us.apache.org/repos/asf/calcite/blob/aed5bca7/core/src/main/java/org/apache/calcite/rex/RexShuttle.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexShuttle.java b/core/src/main/java/org/apache/calcite/rex/RexShuttle.java
index 36b9e40..b642503 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexShuttle.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexShuttle.java
@@ -50,7 +50,8 @@ public class RexShuttle implements RexVisitor<RexNode> {
           over.getType(),
           over.getAggOperator(),
           clonedOperands,
-          window);
+          window,
+          over.isDistinct());
     } else {
       return over;
     }

http://git-wip-us.apache.org/repos/asf/calcite/blob/aed5bca7/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java b/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index 5aa5ec8..573214e 100644
--- a/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++ b/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -1868,13 +1868,23 @@ public class SqlToRelConverter {
       // Walk over the tree and apply 'over' to all agg functions. This is
       // necessary because the returned expression is not necessarily a call
       // to an agg function. For example, AVG(x) becomes SUM(x) / COUNT(x).
+
+      boolean isDistinct = false;
+      if (aggCall.getFunctionQuantifier() != null
+        && aggCall.getFunctionQuantifier().getValue().equals(SqlSelectKeyword.DISTINCT)) {
+        isDistinct = true;
+      }
+
       final RexShuttle visitor =
           new HistogramShuttle(
               partitionKeys.build(), orderKeys.build(),
               RexWindowBound.create(window.getLowerBound(), lowerBound),
               RexWindowBound.create(window.getUpperBound(), upperBound),
-              window);
-      return rexAgg.accept(visitor);
+              window,
+              isDistinct);
+      RexNode overNode = rexAgg.accept(visitor);
+
+      return overNode;
     } finally {
       bb.window = null;
     }
@@ -4957,17 +4967,20 @@ public class SqlToRelConverter {
     private final RexWindowBound lowerBound;
     private final RexWindowBound upperBound;
     private final SqlWindow window;
+    private final boolean distinct;
 
     HistogramShuttle(
         List<RexNode> partitionKeys,
         ImmutableList<RexFieldCollation> orderKeys,
         RexWindowBound lowerBound, RexWindowBound upperBound,
-        SqlWindow window) {
+        SqlWindow window,
+        boolean distinct) {
       this.partitionKeys = partitionKeys;
       this.orderKeys = orderKeys;
       this.lowerBound = lowerBound;
       this.upperBound = upperBound;
       this.window = window;
+      this.distinct = distinct;
     }
 
     public RexNode visitCall(RexCall call) {
@@ -5023,7 +5036,8 @@ public class SqlToRelConverter {
                 upperBound,
                 window.isRows(),
                 window.isAllowPartial(),
-                false);
+                false,
+                distinct);
 
         RexNode histogramCall =
             rexBuilder.makeCall(
@@ -5063,7 +5077,8 @@ public class SqlToRelConverter {
             upperBound,
             window.isRows(),
             window.isAllowPartial(),
-            needSum0);
+            needSum0,
+            distinct);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/aed5bca7/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java b/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
index efb77e7..8f553bb 100644
--- a/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
+++ b/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
@@ -112,6 +112,8 @@ import java.util.List;
 
 import static org.junit.Assert.assertTrue;
 
+
+
 /**
  * Unit test for rules in {@code org.apache.calcite.rel} and subpackages.
  *

http://git-wip-us.apache.org/repos/asf/calcite/blob/aed5bca7/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
index 91889b2..494ec44 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
@@ -443,6 +443,31 @@ public class SqlToRelConverterTest extends SqlToRelTestBase {
     sql("select distinct sal + 5 from emp").ok();
   }
 
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-476">[CALCITE-476]
+   * DISTINCT flag in windowed aggregates</a>. */
+  @Test public void testSelectOverDistinct() {
+    // Checks to see if <aggregate>(DISTINCT x) is set and preserved
+    // as a flag for the aggregate call.
+    final String sql = "select SUM(DISTINCT deptno)\n"
+        + "over (ROWS BETWEEN 10 PRECEDING AND CURRENT ROW)\n"
+        + "from emp\n";
+    sql(sql).ok();
+  }
+
+  /** As {@link #testSelectOverDistinct()} but for streaming queries. */
+  @Test public void testSelectStreamPartitionDistinct() {
+    final String sql = "select stream\n"
+        + "  count(distinct orderId) over (partition by productId\n"
+        + "    order by rowtime\n"
+        + "    range interval '1' second preceding) as c,\n"
+        + "  count(distinct orderId) over w as c2,\n"
+        + "  count(orderId) over w as c3\n"
+        + "from orders\n"
+        + "window w as (partition by productId)";
+    sql(sql).ok();
+  }
+
   @Test public void testSelectDistinctGroup() {
     sql("select distinct sum(sal) from emp group by deptno").ok();
   }

http://git-wip-us.apache.org/repos/asf/calcite/blob/aed5bca7/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
----------------------------------------------------------------------
diff --git a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
index f8374f3..f949786 100644
--- a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
@@ -235,6 +235,39 @@ LogicalProject(DEPTNO=[$0], NAME=[$1])
             <![CDATA[select*from unnest(multiset(select*from dept))]]>
         </Resource>
     </TestCase>
+    <TestCase name="testSelectOverDistinct">
+        <Resource name="plan">
+            <![CDATA[
+LogicalProject(EXPR$0=[CASE(>(COUNT(DISTINCT $7) OVER (ROWS BETWEEN 10 PRECEDING AND CURRENT ROW), 0), CAST($SUM0(DISTINCT $7) OVER (ROWS BETWEEN 10 PRECEDING AND CURRENT ROW)):INTEGER, null)])
+  LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+        </Resource>
+        <Resource name="sql">
+            <![CDATA[select SUM(DISTINCT deptno)
+over (ROWS BETWEEN 10 PRECEDING AND CURRENT ROW)
+from emp
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testSelectStreamPartitionDistinct">
+        <Resource name="plan">
+            <![CDATA[
+LogicalDelta
+  LogicalProject(C=[COUNT(DISTINCT $2) OVER (PARTITION BY $1 ORDER BY $0 RANGE BETWEEN 1000 PRECEDING AND CURRENT ROW)], C2=[COUNT(DISTINCT $2) OVER (PARTITION BY $1 ORDER BY $0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)], C3=[COUNT($2) OVER (PARTITION BY $1 ORDER BY $0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)])
+    LogicalTableScan(table=[[CATALOG, SALES, ORDERS]])
+]]>
+        </Resource>
+        <Resource name="sql">
+            <![CDATA[select stream
+  count(distinct orderId) over (partition by productId
+    order by rowtime
+    range interval '1' second preceding) as c,
+  count(distinct orderId) over w as c2,
+  count(orderId) over w as c3
+from orders
+window w as (partition by productId)]]>
+        </Resource>
+    </TestCase>
     <TestCase name="testUnnestArray">
         <Resource name="sql">
             <![CDATA[select*from unnest(array(select*from dept))]]>