You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by se...@apache.org on 2019/01/23 02:53:50 UTC
[calcite] branch master updated: [CALCITE-2797] Support
APPROX_COUNT_DISTINCT aggregate function in ElasticSearch adapter
This is an automated email from the ASF dual-hosted git repository.
sereda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/master by this push:
new fc10982 [CALCITE-2797] Support APPROX_COUNT_DISTINCT aggregate function in ElasticSearch adapter
fc10982 is described below
commit fc10982bc856a01c23928de15441822de2d3ffde
Author: Andrei Sereda <25...@users.noreply.github.com>
AuthorDate: Thu Jan 17 18:21:14 2019 -0500
[CALCITE-2797] Support APPROX_COUNT_DISTINCT aggregate function in ElasticSearch adapter
Convert approximate count (on distinct values) `APPROX_COUNT_DISTINCT` into elastic
[cardinality](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html)
single-value metrics aggregation.
Remove AggregateExpandDistinctAggregatesRule from elastic planner so `APPROX_COUNT_DISTINCT` function
can be correctly identified.
---
.../elasticsearch/ElasticsearchAggregate.java | 40 ++++++++++++++--------
.../adapter/elasticsearch/ElasticsearchTable.java | 2 +-
.../elasticsearch/ElasticsearchTableScan.java | 5 +++
.../adapter/elasticsearch/AggregationTest.java | 30 ++++++++++++++++
.../elasticsearch/ElasticSearchAdapterTest.java | 29 +++++++---------
5 files changed, 74 insertions(+), 32 deletions(-)
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
index e2810f5..8ca1397 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
@@ -28,8 +28,12 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.util.ImmutableBitSet;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@@ -68,11 +72,14 @@ public class ElasticsearchAggregate extends Aggregate implements ElasticsearchRe
assert this.groupSets.size() == 1 : "Grouping sets not supported";
for (AggregateCall aggCall : aggCalls) {
- if (aggCall.isDistinct()) {
- throw new InvalidRelException("distinct aggregation not supported");
+ if (aggCall.isDistinct() && !aggCall.isApproximate()) {
+ final String message = String.format(Locale.ROOT, "Only approximate distinct "
+ + "aggregations are supported in Elastic (cardinality aggregation). Use %s function",
+ SqlStdOperatorTable.APPROX_COUNT_DISTINCT.getName());
+ throw new InvalidRelException(message);
}
- SqlKind kind = aggCall.getAggregation().getKind();
+ final SqlKind kind = aggCall.getAggregation().getKind();
if (!SUPPORTED_AGGREGATIONS.contains(kind)) {
final String message = String.format(Locale.ROOT,
"Aggregation %s not supported (use one of %s)", kind, SUPPORTED_AGGREGATIONS);
@@ -111,22 +118,24 @@ public class ElasticsearchAggregate extends Aggregate implements ElasticsearchRe
implementor.addGroupBy(inputFields.get(group));
}
+ final ObjectMapper mapper = implementor.elasticsearchTable.mapper;
+
for (AggregateCall aggCall : aggCalls) {
- List<String> names = new ArrayList<>();
+ final List<String> names = new ArrayList<>();
for (int i : aggCall.getArgList()) {
names.add(inputFields.get(i));
}
- final String name = names.isEmpty() ? ElasticsearchConstants.ID : names.get(0);
- // for ANY_VALUE return just a single result
- final String size = aggCall.getAggregation().getKind() == SqlKind.ANY_VALUE ? ", \"size\": 1"
- : "";
+ final ObjectNode aggregation = mapper.createObjectNode();
+ final ObjectNode field = aggregation.with(toElasticAggregate(aggCall));
- final String op = String.format(Locale.ROOT, "{\"%s\":{\"field\": \"%s\" %s}}",
- toElasticAggregate(aggCall),
- name, size);
+ final String name = names.isEmpty() ? ElasticsearchConstants.ID : names.get(0);
+ field.put("field", name);
+ if (aggCall.getAggregation().getKind() == SqlKind.ANY_VALUE) {
+ field.put("size", 1);
+ }
- implementor.addAggregation(aggCall.getName(), op);
+ implementor.addAggregation(aggCall.getName(), aggregation.toString());
}
}
@@ -136,11 +145,12 @@ public class ElasticsearchAggregate extends Aggregate implements ElasticsearchRe
* function. But currently only one-to-one mapping is supported between sql agg and elastic
* aggregation.
*/
- private String toElasticAggregate(AggregateCall call) {
- SqlKind kind = call.getAggregation().getKind();
+ private static String toElasticAggregate(AggregateCall call) {
+ final SqlKind kind = call.getAggregation().getKind();
switch (kind) {
case COUNT:
- return call.isApproximate() ? "cardinality" : "value_count";
+ // approx_count_distinct() vs count()
+ return call.isDistinct() && call.isApproximate() ? "cardinality" : "value_count";
case SUM:
return "sum";
case MIN:
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
index 0b32f89..f4d0028 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
@@ -74,7 +74,7 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl
private final String indexName;
private final String typeName;
final ObjectMapper mapper;
- private final ElasticsearchTransport transport;
+ final ElasticsearchTransport transport;
/**
* Creates an ElasticsearchTable.
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
index 3dd041a..729c266 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
@@ -25,6 +25,7 @@ import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.rules.AggregateExpandDistinctAggregatesRule;
import org.apache.calcite.rel.type.RelDataType;
import java.util.List;
@@ -78,6 +79,10 @@ public class ElasticsearchTableScan extends TableScan implements ElasticsearchRe
for (RelOptRule rule: ElasticsearchRules.RULES) {
planner.addRule(rule);
}
+
+ // remove this rule otherwise elastic can't correctly interpret approx_count_distinct()
+ // it is converted to cardinality aggregation in Elastic
+ planner.removeRule(AggregateExpandDistinctAggregatesRule.INSTANCE);
}
@Override public void implement(Implementor implementor) {
diff --git a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
index 0586e88..ecbcc23 100644
--- a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
+++ b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
@@ -299,6 +299,36 @@ public class AggregationTest {
"cat5=null; EXPR$1=0.0",
"cat5=2; EXPR$1=7.0");
}
+
+ /**
+ * Validate {@link org.apache.calcite.sql.fun.SqlStdOperatorTable#APPROX_COUNT_DISTINCT}.
+ */
+ @Test
+ public void approximateCountDistinct() {
+ // approx_count_distinct counts distinct *non-null* values
+ CalciteAssert.that()
+ .with(newConnectionFactory())
+ .query("select approx_count_distinct(cat1) from view")
+ .returnsUnordered("EXPR$0=2");
+
+ CalciteAssert.that()
+ .with(newConnectionFactory())
+ .query("select approx_count_distinct(cat2) from view")
+ .returnsUnordered("EXPR$0=2");
+
+ CalciteAssert.that()
+ .with(newConnectionFactory())
+ .query("select cat1, approx_count_distinct(val1) from view group by cat1")
+ .returnsUnordered("cat1=a; EXPR$1=1",
+ "cat1=b; EXPR$1=1",
+ "cat1=null; EXPR$1=0");
+ CalciteAssert.that()
+ .with(newConnectionFactory())
+ .query("select cat1, approx_count_distinct(val2) from view group by cat1")
+ .returnsUnordered("cat1=a; EXPR$1=0",
+ "cat1=b; EXPR$1=1",
+ "cat1=null; EXPR$1=1");
+ }
}
// End AggregationTest.java
diff --git a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
index f5a1265..507d106 100644
--- a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
+++ b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
@@ -31,7 +31,6 @@ import com.google.common.io.Resources;
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
@@ -647,29 +646,27 @@ public class ElasticSearchAdapterTest {
}
/**
- * Checks
- * <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html">Cardinality</a>
- * aggregation {@code approx_count_distinct}
+ * Test of {@link org.apache.calcite.sql.fun.SqlStdOperatorTable#APPROX_COUNT_DISTINCT} which
+ * will be translated to
+ * <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html">Cardinality Aggregation</a>
+ * (approximate counts using HyperLogLog++ algorithm).
*/
@Test
- @Ignore
public void approximateCount() throws Exception {
- // approx_count_distinct is converted into two aggregations. needs investigation
- // ElasticsearchAggregate(group=[{1}], EXPR$0=[COUNT($0)])\r
- // ElasticsearchAggregate(group=[{0, 1}])\r
calciteAssert()
- .query("select approx_count_distinct(city), state from zips group by state "
- + "order by state limit 3")
+ .query("select state, approx_count_distinct(city), approx_count_distinct(pop) from zips"
+ + " group by state order by state limit 3")
.queryContains(
ElasticsearchChecker.elasticsearchChecker("'_source':false",
"size:0",
- "aggregations:{'g_state':{terms:{field:state, size:3, "
+ "aggregations:{'g_state':{terms:{field:'state', missing:'__MISSING__', size:3, "
+ "order:{'_key':'asc'}}",
- "aggregations:{'EXPR$0':{cardinality:{field:city}} }}}"))
- .returnsOrdered("EXPR$0=3; state=AK",
- "EXPR$0=3; state=AL",
- "EXPR$0=3; state=AR");
-
+ "aggregations:{'EXPR$1':{cardinality:{field:'city'}}",
+ "'EXPR$2':{cardinality:{field:'pop'}} "
+ + " }}}"))
+ .returnsOrdered("state=AK; EXPR$1=3; EXPR$2=3",
+ "state=AL; EXPR$1=3; EXPR$2=3",
+ "state=AR; EXPR$1=3; EXPR$2=3");
}
}