You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by se...@apache.org on 2019/01/23 02:53:50 UTC

[calcite] branch master updated: [CALCITE-2797] Support APPROX_COUNT_DISTINCT aggregate function in ElasticSearch adapter

This is an automated email from the ASF dual-hosted git repository.

sereda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git


The following commit(s) were added to refs/heads/master by this push:
     new fc10982  [CALCITE-2797] Support APPROX_COUNT_DISTINCT aggregate function in ElasticSearch adapter
fc10982 is described below

commit fc10982bc856a01c23928de15441822de2d3ffde
Author: Andrei Sereda <25...@users.noreply.github.com>
AuthorDate: Thu Jan 17 18:21:14 2019 -0500

    [CALCITE-2797] Support APPROX_COUNT_DISTINCT aggregate function in ElasticSearch adapter
    
    Convert approximate count (on distinct values) `APPROX_COUNT_DISTINCT` into elastic
    [cardinality](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html)
    single-value metrics aggregation.
    
    Remove AggregateExpandDistinctAggregatesRule from elastic planner so `APPROX_COUNT_DISTINCT` function
    can be correctly identified.
---
 .../elasticsearch/ElasticsearchAggregate.java      | 40 ++++++++++++++--------
 .../adapter/elasticsearch/ElasticsearchTable.java  |  2 +-
 .../elasticsearch/ElasticsearchTableScan.java      |  5 +++
 .../adapter/elasticsearch/AggregationTest.java     | 30 ++++++++++++++++
 .../elasticsearch/ElasticSearchAdapterTest.java    | 29 +++++++---------
 5 files changed, 74 insertions(+), 32 deletions(-)

diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
index e2810f5..8ca1397 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
@@ -28,8 +28,12 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.util.ImmutableBitSet;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -68,11 +72,14 @@ public class ElasticsearchAggregate extends Aggregate implements ElasticsearchRe
     assert this.groupSets.size() == 1 : "Grouping sets not supported";
 
     for (AggregateCall aggCall : aggCalls) {
-      if (aggCall.isDistinct()) {
-        throw new InvalidRelException("distinct aggregation not supported");
+      if (aggCall.isDistinct() && !aggCall.isApproximate()) {
+        final String message = String.format(Locale.ROOT, "Only approximate distinct "
+            + "aggregations are supported in Elastic (cardinality aggregation). Use %s function",
+            SqlStdOperatorTable.APPROX_COUNT_DISTINCT.getName());
+        throw new InvalidRelException(message);
       }
 
-      SqlKind kind = aggCall.getAggregation().getKind();
+      final SqlKind kind = aggCall.getAggregation().getKind();
       if (!SUPPORTED_AGGREGATIONS.contains(kind)) {
         final String message = String.format(Locale.ROOT,
             "Aggregation %s not supported (use one of %s)", kind, SUPPORTED_AGGREGATIONS);
@@ -111,22 +118,24 @@ public class ElasticsearchAggregate extends Aggregate implements ElasticsearchRe
       implementor.addGroupBy(inputFields.get(group));
     }
 
+    final ObjectMapper mapper = implementor.elasticsearchTable.mapper;
+
     for (AggregateCall aggCall : aggCalls) {
-      List<String> names = new ArrayList<>();
+      final List<String> names = new ArrayList<>();
       for (int i : aggCall.getArgList()) {
         names.add(inputFields.get(i));
       }
 
-      final String name = names.isEmpty() ? ElasticsearchConstants.ID : names.get(0);
-      // for ANY_VALUE return just a single result
-      final String size = aggCall.getAggregation().getKind() == SqlKind.ANY_VALUE ? ", \"size\": 1"
-           : "";
+      final ObjectNode aggregation = mapper.createObjectNode();
+      final ObjectNode field = aggregation.with(toElasticAggregate(aggCall));
 
-      final String op = String.format(Locale.ROOT, "{\"%s\":{\"field\": \"%s\" %s}}",
-          toElasticAggregate(aggCall),
-          name, size);
+      final String name = names.isEmpty() ? ElasticsearchConstants.ID : names.get(0);
+      field.put("field", name);
+      if (aggCall.getAggregation().getKind() == SqlKind.ANY_VALUE) {
+        field.put("size", 1);
+      }
 
-      implementor.addAggregation(aggCall.getName(), op);
+      implementor.addAggregation(aggCall.getName(), aggregation.toString());
     }
   }
 
@@ -136,11 +145,12 @@ public class ElasticsearchAggregate extends Aggregate implements ElasticsearchRe
    * function. But currently only one-to-one mapping is supported between sql agg and elastic
    * aggregation.
    */
-  private String toElasticAggregate(AggregateCall call) {
-    SqlKind kind = call.getAggregation().getKind();
+  private static String toElasticAggregate(AggregateCall call) {
+    final SqlKind kind = call.getAggregation().getKind();
     switch (kind) {
     case COUNT:
-      return call.isApproximate() ? "cardinality" : "value_count";
+      // approx_count_distinct() vs count()
+      return call.isDistinct() && call.isApproximate() ? "cardinality" : "value_count";
     case SUM:
       return "sum";
     case MIN:
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
index 0b32f89..f4d0028 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
@@ -74,7 +74,7 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl
   private final String indexName;
   private final String typeName;
   final ObjectMapper mapper;
-  private final ElasticsearchTransport transport;
+  final ElasticsearchTransport transport;
 
   /**
    * Creates an ElasticsearchTable.
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
index 3dd041a..729c266 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
@@ -25,6 +25,7 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.rules.AggregateExpandDistinctAggregatesRule;
 import org.apache.calcite.rel.type.RelDataType;
 
 import java.util.List;
@@ -78,6 +79,10 @@ public class ElasticsearchTableScan extends TableScan implements ElasticsearchRe
     for (RelOptRule rule: ElasticsearchRules.RULES) {
       planner.addRule(rule);
     }
+
+    // remove this rule otherwise elastic can't correctly interpret approx_count_distinct()
+    // it is converted to cardinality aggregation in Elastic
+    planner.removeRule(AggregateExpandDistinctAggregatesRule.INSTANCE);
   }
 
   @Override public void implement(Implementor implementor) {
diff --git a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
index 0586e88..ecbcc23 100644
--- a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
+++ b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
@@ -299,6 +299,36 @@ public class AggregationTest {
             "cat5=null; EXPR$1=0.0",
             "cat5=2; EXPR$1=7.0");
   }
+
+  /**
+   * Validate {@link org.apache.calcite.sql.fun.SqlStdOperatorTable#APPROX_COUNT_DISTINCT}.
+   */
+  @Test
+  public void approximateCountDistinct() {
+    // approx_count_distinct counts distinct *non-null* values
+    CalciteAssert.that()
+        .with(newConnectionFactory())
+        .query("select approx_count_distinct(cat1) from view")
+        .returnsUnordered("EXPR$0=2");
+
+    CalciteAssert.that()
+        .with(newConnectionFactory())
+        .query("select approx_count_distinct(cat2) from view")
+        .returnsUnordered("EXPR$0=2");
+
+    CalciteAssert.that()
+        .with(newConnectionFactory())
+        .query("select cat1, approx_count_distinct(val1) from view group by cat1")
+        .returnsUnordered("cat1=a; EXPR$1=1",
+                          "cat1=b; EXPR$1=1",
+                          "cat1=null; EXPR$1=0");
+    CalciteAssert.that()
+        .with(newConnectionFactory())
+        .query("select cat1, approx_count_distinct(val2) from view group by cat1")
+        .returnsUnordered("cat1=a; EXPR$1=0",
+                          "cat1=b; EXPR$1=1",
+                          "cat1=null; EXPR$1=1");
+  }
 }
 
 // End AggregationTest.java
diff --git a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
index f5a1265..507d106 100644
--- a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
+++ b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
@@ -31,7 +31,6 @@ import com.google.common.io.Resources;
 
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -647,29 +646,27 @@ public class ElasticSearchAdapterTest {
   }
 
   /**
-   * Checks
-   * <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html">Cardinality</a>
-   * aggregation {@code approx_count_distinct}
+   * Test of {@link org.apache.calcite.sql.fun.SqlStdOperatorTable#APPROX_COUNT_DISTINCT} which
+   * will be translated to
+   * <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html">Cardinality Aggregation</a>
+   * (approximate counts using HyperLogLog++ algorithm).
    */
   @Test
-  @Ignore
   public void approximateCount() throws Exception {
-    // approx_count_distinct is converted into two aggregations. needs investigation
-    // ElasticsearchAggregate(group=[{1}], EXPR$0=[COUNT($0)])\r
-    //  ElasticsearchAggregate(group=[{0, 1}])\r
     calciteAssert()
-        .query("select approx_count_distinct(city), state from zips group by state "
-            + "order by state limit 3")
+        .query("select state, approx_count_distinct(city), approx_count_distinct(pop) from zips"
+            + " group by state order by state limit 3")
         .queryContains(
             ElasticsearchChecker.elasticsearchChecker("'_source':false",
             "size:0",
-            "aggregations:{'g_state':{terms:{field:state, size:3, "
+            "aggregations:{'g_state':{terms:{field:'state', missing:'__MISSING__', size:3, "
                 + "order:{'_key':'asc'}}",
-            "aggregations:{'EXPR$0':{cardinality:{field:city}} }}}"))
-        .returnsOrdered("EXPR$0=3; state=AK",
-            "EXPR$0=3; state=AL",
-            "EXPR$0=3; state=AR");
-
+            "aggregations:{'EXPR$1':{cardinality:{field:'city'}}",
+                "'EXPR$2':{cardinality:{field:'pop'}} "
+                + " }}}"))
+        .returnsOrdered("state=AK; EXPR$1=3; EXPR$2=3",
+            "state=AL; EXPR$1=3; EXPR$2=3",
+            "state=AR; EXPR$1=3; EXPR$2=3");
   }
 
 }