You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jc...@apache.org on 2017/06/19 14:33:49 UTC

[2/2] calcite git commit: [CALCITE-1805] Druid adapter incorrectly pushes down "COUNT(c)"; Druid only supports "COUNT(*)"

[CALCITE-1805] Druid adapter incorrectly pushes down "COUNT(c)"; Druid only supports "COUNT(*)"

Fixes DruidAdapterIT.testGroupByAvgSumCount and DruidAdapterIT.testTimeExtractThatCannotBePushed


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/e162df1c
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/e162df1c
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/e162df1c

Branch: refs/heads/master
Commit: e162df1cfeb615da909ac649998e81f3aa3faf20
Parents: 12e020e
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Mon Jun 19 15:29:11 2017 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Mon Jun 19 15:33:12 2017 +0100

----------------------------------------------------------------------
 .../calcite/interpreter/AggregateNode.java      | 90 ++++++++++++++++++--
 .../org/apache/calcite/test/DruidAdapterIT.java | 15 +---
 2 files changed, 88 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/e162df1c/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java b/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
index 5a5d265..6a54b87 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
@@ -136,6 +136,24 @@ public class AggregateNode extends AbstractSingleNode<Aggregate> {
       }
       return new UdaAccumulatorFactory(
           AggregateFunctionImpl.create(clazz), call);
+    } else if (call.getAggregation() == SqlStdOperatorTable.SUM0) {
+      final Class<?> clazz;
+      switch (call.type.getSqlTypeName()) {
+      case DOUBLE:
+      case REAL:
+      case FLOAT:
+        clazz = DoubleSum0.class;
+        break;
+      case INTEGER:
+        clazz = IntSum0.class;
+        break;
+      case BIGINT:
+      default:
+        clazz = LongSum0.class;
+        break;
+      }
+      return new UdaAccumulatorFactory(
+          AggregateFunctionImpl.create(clazz), call);
     } else {
       final JavaTypeFactory typeFactory =
           (JavaTypeFactory) rel.getCluster().getTypeFactory();
@@ -379,8 +397,8 @@ public class AggregateNode extends AbstractSingleNode<Aggregate> {
   public static class IntSum {
     public IntSum() {
     }
-    public int init() {
-      return 0;
+    public Integer init() {
+      return null;
     }
     public int add(int accumulator, int v) {
       return accumulator + v;
@@ -388,7 +406,7 @@ public class AggregateNode extends AbstractSingleNode<Aggregate> {
     public int merge(int accumulator0, int accumulator1) {
       return accumulator0 + accumulator1;
     }
-    public int result(int accumulator) {
+    public Integer result(Integer accumulator) {
       return accumulator;
     }
   }
@@ -398,8 +416,8 @@ public class AggregateNode extends AbstractSingleNode<Aggregate> {
   public static class LongSum {
     public LongSum() {
     }
-    public long init() {
-      return 0L;
+    public Long init() {
+      return null;
     }
     public long add(long accumulator, long v) {
       return accumulator + v;
@@ -407,7 +425,7 @@ public class AggregateNode extends AbstractSingleNode<Aggregate> {
     public long merge(long accumulator0, long accumulator1) {
       return accumulator0 + accumulator1;
     }
-    public long result(long accumulator) {
+    public Long result(Long accumulator) {
       return accumulator;
     }
   }
@@ -417,6 +435,63 @@ public class AggregateNode extends AbstractSingleNode<Aggregate> {
   public static class DoubleSum {
     public DoubleSum() {
     }
+    public Double init() {
+      return null;
+    }
+    public double add(double accumulator, double v) {
+      return accumulator + v;
+    }
+    public double merge(double accumulator0, double accumulator1) {
+      return accumulator0 + accumulator1;
+    }
+    public Double result(Double accumulator) {
+      return accumulator;
+    }
+  }
+
+  /** Implementation of {@code SUM0} over INTEGER values as a user-defined
+   * aggregate. */
+  public static class IntSum0 {
+    public IntSum0() {
+    }
+    public int init() {
+      return 0;
+    }
+    public int add(int accumulator, int v) {
+      return accumulator + v;
+    }
+    public int merge(int accumulator0, int accumulator1) {
+      return accumulator0 + accumulator1;
+    }
+    public int result(int accumulator) {
+      return accumulator;
+    }
+  }
+
+  /** Implementation of {@code SUM0} over BIGINT values as a user-defined
+   * aggregate. */
+  public static class LongSum0 {
+    public LongSum0() {
+    }
+    public long init() {
+      return 0L;
+    }
+    public long add(long accumulator, long v) {
+      return accumulator + v;
+    }
+    public long merge(long accumulator0, long accumulator1) {
+      return accumulator0 + accumulator1;
+    }
+    public long result(long accumulator) {
+      return accumulator;
+    }
+  }
+
+  /** Implementation of {@code SUM0} over DOUBLE values as a user-defined
+   * aggregate. */
+  public static class DoubleSum0 {
+    public DoubleSum0() {
+    }
     public double init() {
       return 0D;
     }
@@ -480,6 +555,9 @@ public class AggregateNode extends AbstractSingleNode<Aggregate> {
 
     public void send(Row row) {
       final Object[] args = {value, row.getValues()[factory.argOrdinal]};
+      if (args[0] == null) {
+        args[0] = 0; // first value, set to zero
+      }
       for (int i = 1; i < args.length; i++) {
         if (args[i] == null) {
           return; // one of the arguments is null; don't add to the total

http://git-wip-us.apache.org/repos/asf/calcite/blob/e162df1c/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
index 0cd7a4b..719d3bd 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
@@ -1071,17 +1071,11 @@ public class DruidAdapterIT {
         + "from \"foodmart\"\n"
         + "group by \"state_province\"\n"
         + "order by 1";
-    String druidQuery = "'aggregations':["
-        + "{'type':'longSum','name':'$f1','fieldName':'unit_sales'},"
-        + "{'type':'count','name':'$f2','fieldName':'unit_sales'},"
-        + "{'type':'count','name':'C','fieldName':'store_sqft'},"
-        + "{'type':'count','name':'C0'}],"
-        + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000']}";
     sql(sql)
         .limit(2)
-        .returnsUnordered("state_province=CA; A=3; S=74748; C=24441; C0=24441",
+        .returnsUnordered("state_province=CA; A=3; S=74748; C=16347; C0=24441",
             "state_province=OR; A=3; S=67659; C=21610; C0=21610")
-        .queryContains(druidChecker(druidQuery));
+        .queryContains(druidChecker("'queryType':'select'"));
   }
 
   @Test public void testGroupByMonthGranularity() {
@@ -2075,13 +2069,12 @@ public class DruidAdapterIT {
         + "\"product_id\" = 1558 group by extract(CENTURY from \"timestamp\")";
     final String plan = "PLAN=EnumerableInterpreter\n"
         + "  BindableAggregate(group=[{0}])\n"
-        + "    BindableProject(EXPR$0=[/INT(EXTRACT_DATE(FLAG(YEAR), /INT(Reinterpret($0), "
-        + "86400000)), 100)])\n"
+        + "    BindableProject(EXPR$0=[EXTRACT_DATE(FLAG(CENTURY), /INT(Reinterpret($0), 86400000))])\n"
         + "      DruidQuery(table=[[foodmart, foodmart]], "
         + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], filter=[=($1, 1558)], "
         + "projects=[[$0]])";
     sql(sql).explainContains(plan).queryContains(druidChecker("'queryType':'select'"))
-        .returnsUnordered("EXPR$0=19");
+        .returnsUnordered("EXPR$0=20");
   }
 
   /** Test case for