You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jc...@apache.org on 2017/06/22 16:09:37 UTC

calcite git commit: [CALCITE-1853] Push Count distinct into Druid when approximate results are acceptable (Zain Humayun)

Repository: calcite
Updated Branches:
  refs/heads/master 03bb2cea5 -> 898c2d66a


[CALCITE-1853] Push Count distinct into Druid when approximate results are acceptable (Zain Humayun)

Close apache/calcite#478


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/898c2d66
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/898c2d66
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/898c2d66

Branch: refs/heads/master
Commit: 898c2d66a18af9f97f8061803a19f211d3d6ed85
Parents: 03bb2ce
Author: Zain Humayun <zh...@yahoo-inc.com>
Authored: Tue Jun 20 13:10:01 2017 -0700
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Thu Jun 22 17:09:23 2017 +0100

----------------------------------------------------------------------
 .../calcite/adapter/druid/DruidQuery.java       |  18 ++-
 .../calcite/adapter/druid/DruidRules.java       |  38 ++++---
 .../calcite/adapter/druid/DruidTable.java       |   4 +
 .../org/apache/calcite/test/DruidAdapterIT.java | 112 +++++++++++++++++++
 4 files changed, 153 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/898c2d66/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
index 39ce4a8..1c39fc6 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
@@ -480,13 +480,15 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
     return getQuerySpec().queryString;
   }
 
+  protected CalciteConnectionConfig getConnectionConfig() {
+    return getCluster().getPlanner().getContext().unwrap(CalciteConnectionConfig.class);
+  }
+
   protected QuerySpec getQuery(RelDataType rowType, RexNode filter, List<RexNode> projects,
       ImmutableBitSet groupSet, List<AggregateCall> aggCalls, List<String> aggNames,
       List<Integer> collationIndexes, List<Direction> collationDirections,
       ImmutableBitSet numericCollationIndexes, Integer fetch) {
-    final CalciteConnectionConfig config =
-        getCluster().getPlanner().getContext()
-            .unwrap(CalciteConnectionConfig.class);
+    final CalciteConnectionConfig config = getConnectionConfig();
     QueryType queryType = QueryType.SELECT;
     final Translator translator = new Translator(druidTable, rowType);
     List<String> fieldNames = rowType.getFieldNames();
@@ -805,10 +807,18 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
       // Cannot handle this aggregate function type
       throw new AssertionError("unknown aggregate type " + type);
     }
+    CalciteConnectionConfig config = getConnectionConfig();
     switch (aggCall.getAggregation().getKind()) {
     case COUNT:
       if (aggCall.isDistinct()) {
-        return new JsonCardinalityAggregation("cardinality", name, list);
+        if (config.approximateDistinctCount()) {
+          return new JsonCardinalityAggregation("cardinality", name, list);
+        } else {
+          // Gets thrown if one of the rules allows a count(distinct ...) through
+          // when approximate results were not told be acceptable.
+          throw new UnsupportedOperationException("Cannot push " + aggCall
+              + " because an approximate count distinct is not acceptable.");
+        }
       }
       return new JsonAggregation("count", name, only);
     case SUM:

http://git-wip-us.apache.org/repos/asf/calcite/blob/898c2d66/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
index ef5d6f2..de65a3a 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
@@ -113,20 +113,30 @@ public class DruidRules {
           SORT_PROJECT_TRANSPOSE);
 
   /** Predicate that returns whether Druid can not handle an aggregate. */
-  private static final Predicate<Aggregate> BAD_AGG =
-      new PredicateImpl<Aggregate>() {
-        public boolean test(Aggregate aggregate) {
-          final CalciteConnectionConfig config =
-                  aggregate.getCluster().getPlanner().getContext()
-                      .unwrap(CalciteConnectionConfig.class);
+  private static final Predicate<Triple<Aggregate, RelNode, DruidQuery>> BAD_AGG =
+      new PredicateImpl<Triple<Aggregate, RelNode, DruidQuery>>() {
+        public boolean test(Triple<Aggregate, RelNode, DruidQuery> triple) {
+          final Aggregate aggregate = triple.getLeft();
+          final RelNode node = triple.getMiddle();
+          final DruidQuery query = triple.getRight();
+
+          final CalciteConnectionConfig config = query.getConnectionConfig();
           for (AggregateCall aggregateCall : aggregate.getAggCallList()) {
             switch (aggregateCall.getAggregation().getKind()) {
             case COUNT:
-              if (!aggregateCall.getArgList().isEmpty()) {
-                // Cannot handle this aggregate function
+              // Druid can handle 2 scenarios:
+              // 1. count(distinct col) when approximate results
+              //    are acceptable and col is not a metric
+              // 2. count(*)
+              if (checkAggregateOnMetric(ImmutableBitSet.of(aggregateCall.getArgList()),
+                      node, query)) {
                 return true;
               }
-              break;
+              if ((config.approximateDistinctCount() && aggregateCall.isDistinct())
+                      || aggregateCall.getArgList().isEmpty()) {
+                continue;
+              }
+              return true;
             case SUM:
             case SUM0:
             case MIN:
@@ -264,8 +274,7 @@ public class DruidRules {
         } else {
           boolean filterOnMetrics = false;
           for (Integer i : visitor.inputPosReferenced) {
-            if (input.druidTable.metricFieldNames.contains(
-                    input.getRowType().getFieldList().get(i).getName())) {
+            if (input.druidTable.isMetric(input.getRowType().getFieldList().get(i).getName())) {
               // Filter on metrics, not supported in Druid
               filterOnMetrics = true;
               break;
@@ -398,7 +407,7 @@ public class DruidRules {
       }
       if (aggregate.indicator
               || aggregate.getGroupSets().size() != 1
-              || BAD_AGG.apply(aggregate)
+              || BAD_AGG.apply(ImmutableTriple.of(aggregate, (RelNode) aggregate, query))
               || !validAggregate(aggregate, query)) {
         return;
       }
@@ -445,7 +454,7 @@ public class DruidRules {
       }
       if (aggregate.indicator
               || aggregate.getGroupSets().size() != 1
-              || BAD_AGG.apply(aggregate)
+              || BAD_AGG.apply(ImmutableTriple.of(aggregate, (RelNode) project, query))
               || !validAggregate(aggregate, timestampIdx)) {
         return;
       }
@@ -698,8 +707,7 @@ public class DruidRules {
       set = newSet.build();
     }
     for (int index : set) {
-      if (query.druidTable.metricFieldNames
-              .contains(query.getTopNode().getRowType().getFieldNames().get(index))) {
+      if (query.druidTable.isMetric(query.getTopNode().getRowType().getFieldNames().get(index))) {
         return true;
       }
     }

http://git-wip-us.apache.org/repos/asf/calcite/blob/898c2d66/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTable.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTable.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTable.java
index e92fd4b..656f20f 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTable.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTable.java
@@ -122,6 +122,10 @@ public class DruidTable extends AbstractTable implements TranslatableTable {
         ImmutableList.<RelNode>of(scan));
   }
 
+  public boolean isMetric(String name) {
+    return metricFieldNames.contains(name);
+  }
+
   /** Creates a {@link RelDataType} from a map of
    * field names and types. */
   private static class MapRelProtoDataType implements RelProtoDataType {

http://git-wip-us.apache.org/repos/asf/calcite/blob/898c2d66/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
index cb2024d..e04aba1 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
@@ -2197,6 +2197,118 @@ public class DruidAdapterIT {
             .queryContains(druidChecker(druidFilter))
             .returnsOrdered("EXPR$0=11");
   }
+
+  /**
+   * Test to ensure that count(distinct ...) gets pushed to Druid when approximate results are
+   * acceptable
+   * */
+  @Test public void testDistinctCountWhenApproxResultsAccepted() {
+    String sql = "select count(distinct \"customer_id\") from \"foodmart\"";
+    String expectedSubExplain = "DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00"
+        + ":00:00.000/2992-01-10T00:00:00.000]], groups=[{}], aggs=[[COUNT(DISTINCT $20)]])";
+    String expectedAggregate = "{'type':'cardinality','name':"
+        + "'EXPR$0','fieldNames':['customer_id']}";
+
+    testCountWithApproxDistinct(true, sql, expectedSubExplain, expectedAggregate);
+  }
+
+  /**
+   * Test to ensure that count(distinct ...) doesn't get pushed to Druid when approximate results
+   * are not acceptable
+   */
+  @Test public void testDistinctCountWhenApproxResultsNotAccepted() {
+    String sql = "select count(distinct \"customer_id\") from \"foodmart\"";
+    String expectedSubExplain = "  BindableAggregate(group=[{}], EXPR$0=[COUNT($0)])\n"
+        + "    DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], "
+        + "groups=[{20}], aggs=[[]])";
+
+    testCountWithApproxDistinct(false, sql, expectedSubExplain);
+  }
+
+  /**
+   * Test to ensure that a count distinct on metric does not get pushed into Druid
+   */
+  @Test public void testDistinctCountOnMetric() {
+    String sql = "select count(distinct \"store_sales\") from \"foodmart\" "
+        + "where \"store_state\" = 'WA'";
+    String expectedSubExplain = "  BindableAggregate(group=[{}], EXPR$0=[COUNT($0)])\n"
+        + "    BindableAggregate(group=[{1}])";
+
+    testCountWithApproxDistinct(true, sql, expectedSubExplain);
+    testCountWithApproxDistinct(false, sql, expectedSubExplain);
+  }
+
+  /**
+   * Test to ensure that a count on a metric does not get pushed into Druid
+   */
+  @Test public void testCountOnMetric() {
+    String sql = "select \"brand_name\", count(\"store_sales\") from \"foodmart\" "
+        + "group by \"brand_name\"";
+    String expectedSubExplain = "  BindableAggregate(group=[{0}], EXPR$1=[COUNT($1)])\n"
+        + "    DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/"
+        + "2992-01-10T00:00:00.000]], projects=[[$2, $90]])";
+
+    testCountWithApproxDistinct(true, sql, expectedSubExplain);
+    testCountWithApproxDistinct(false, sql, expectedSubExplain);
+  }
+
+  /**
+   * Test to ensure that count(*) is pushed into Druid
+   */
+  @Test public void testCountStar() {
+    String sql = "select count(*) from \"foodmart\"";
+    String expectedSubExplain = "  DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], "
+        + "projects=[[]], groups=[{}], aggs=[[COUNT()]])";
+
+    sql(sql).explainContains(expectedSubExplain);
+  }
+
+  /**
+   * Test to ensure that count() aggregates with metric columns are not pushed into Druid
+   * even when the metric column has been renamed
+   */
+  @Test public void testCountOnMetricRenamed() {
+    String sql = "select \"B\", count(\"A\") from "
+        + "(select \"unit_sales\" as \"A\", \"customer_id\" as \"B\" from \"foodmart\") "
+        + "group by \"B\"";
+    String expectedSubExplain = "  BindableAggregate(group=[{0}], EXPR$1=[COUNT($1)])\n"
+        + "    DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000"
+        + "/2992-01-10T00:00:00.000]], projects=[[$20, $89]])\n";
+
+    testCountWithApproxDistinct(true, sql, expectedSubExplain);
+    testCountWithApproxDistinct(false, sql, expectedSubExplain);
+  }
+
+  @Test public void testDistinctCountOnMetricRenamed() {
+    String sql = "select \"B\", count(distinct \"A\") from "
+        + "(select \"unit_sales\" as \"A\", \"customer_id\" as \"B\" from \"foodmart\") "
+        + "group by \"B\"";
+    String expectedSubExplain = "  BindableAggregate(group=[{0}], EXPR$1=[COUNT($1)])\n"
+        + "    DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:"
+        + "00.000/2992-01-10T00:00:00.000]], projects=[[$20, $89]], groups=[{0, 1}], "
+        + "aggs=[[]])";
+
+    testCountWithApproxDistinct(true, sql, expectedSubExplain);
+    testCountWithApproxDistinct(false, sql, expectedSubExplain);
+  }
+
+  private void testCountWithApproxDistinct(boolean approx, String sql, String expectedExplain) {
+    testCountWithApproxDistinct(approx, sql, expectedExplain, "");
+  }
+
+  private void testCountWithApproxDistinct(boolean approx, String sql, String expectedExplain,
+      String expectedDruidQuery) {
+    CalciteAssert.that()
+        .enable(enabled())
+        .with(ImmutableMap.of("model", FOODMART.getPath()))
+        .with(CalciteConnectionProperty.APPROXIMATE_DISTINCT_COUNT.camelName(), approx)
+        .query(sql)
+        .runs()
+        .explainContains(expectedExplain)
+        .queryContains(druidChecker(expectedDruidQuery));
+  }
 }
 
 // End DruidAdapterIT.java