You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2018/12/02 00:55:04 UTC
[12/15] calcite git commit: [CALCITE-2679] In Elasticsearch adapter,
implement DISTINCT and GROUP BY without aggregate function (Siyuan
Liu)
[CALCITE-2679] In Elasticsearch adapter, implement DISTINCT and GROUP BY without aggregate function (Siyuan Liu)
This commit mainly fixed 3 bugs:
1. Group by and distinct query enter the wrong execution branch.
[ElasticsearchTable:126]
2. Values in agg bucket loses a part after returning as a result.
[ElasticsearchJson:83-93, 546-551]
3. Logic of removing empty agg blocks can not work. [Elasticsearch:240-254]
Close apache/calcite#927
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/96605a86
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/96605a86
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/96605a86
Branch: refs/heads/master
Commit: 96605a86cfc4586951700727f12d33fc00c1ce55
Parents: e809344
Author: liusiyuan1 <li...@360.cn>
Authored: Sun Nov 18 23:28:29 2018 +0800
Committer: Julian Hyde <jh...@apache.org>
Committed: Sat Dec 1 14:42:33 2018 -0800
----------------------------------------------------------------------
.../elasticsearch/ElasticsearchJson.java | 42 ++++++---
.../elasticsearch/ElasticsearchTable.java | 29 ++++--
.../elasticsearch/ElasticSearchAdapterTest.java | 96 +++++++++++++++-----
3 files changed, 120 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/calcite/blob/96605a86/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
index e389ecf..dd49dfa 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
@@ -27,15 +27,14 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -74,9 +73,17 @@ final class ElasticsearchJson {
rows.computeIfAbsent(r, ignore -> new ArrayList<>()).add(v);
aggregations.forEach(a -> visitValueNodes(a, new ArrayList<>(), cons));
rows.forEach((k, v) -> {
- Map<String, Object> row = new LinkedHashMap<>(k.keys);
- v.forEach(val -> row.put(val.getName(), val.value()));
- consumer.accept(row);
+ if (v.stream().anyMatch(val -> val instanceof GroupValue)) {
+ v.forEach(tuple -> {
+ Map<String, Object> groupRow = new LinkedHashMap<>(k.keys);
+ groupRow.put(tuple.getName(), tuple.value());
+ consumer.accept(groupRow);
+ });
+ } else {
+ Map<String, Object> row = new LinkedHashMap<>(k.keys);
+ v.forEach(val -> row.put(val.getName(), val.value()));
+ consumer.accept(row);
+ }
});
}
@@ -178,7 +185,7 @@ final class ElasticsearchJson {
Bucket bucket = (Bucket) aggregation;
if (bucket.hasNoAggregations()) {
// bucket with no aggregations is also considered a leaf node
- visitValueNodes(MultiValue.of(bucket.getName(), bucket.key()), parents, consumer);
+ visitValueNodes(GroupValue.of(bucket.getName(), bucket.key()), parents, consumer);
return;
}
parents.add(bucket);
@@ -561,13 +568,23 @@ final class ElasticsearchJson {
return values().get("value");
}
+ }
+
+ /**
+ * Distinguishes from {@link MultiValue}.
+ * In order that rows which have the same key can be put into result map.
+ */
+ static class GroupValue extends MultiValue {
+ GroupValue(String name, Map<String, Object> values) {
+ super(name, values);
+ }
+
/**
- * Constructs a {@link MultiValue} instance with a single value.
+ * Constructs a {@link GroupValue} instance with a single value.
*/
- static MultiValue of(String name, Object value) {
- return new MultiValue(name, Collections.singletonMap("value", value));
+ static GroupValue of(String name, Object value) {
+ return new GroupValue(name, Collections.singletonMap("value", value));
}
-
}
/**
@@ -575,8 +592,9 @@ final class ElasticsearchJson {
*/
static class AggregationsDeserializer extends StdDeserializer<Aggregations> {
- private static final Set<String> IGNORE_TOKENS = new HashSet<>(Arrays.asList("meta",
- "buckets", "value", "values", "value_as_string", "doc_count", "key", "key_as_string"));
+ private static final Set<String> IGNORE_TOKENS =
+ ImmutableSet.of("meta", "buckets", "value", "values", "value_as_string",
+ "doc_count", "key", "key_as_string");
AggregationsDeserializer() {
super(Aggregations.class);
http://git-wip-us.apache.org/repos/asf/calcite/blob/96605a86/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
index b009fff..e288a16 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
@@ -53,6 +53,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -122,7 +123,7 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl
List<Map.Entry<String, String>> aggregations,
Long offset, Long fetch) throws IOException {
- if (!aggregations.isEmpty()) {
+ if (!aggregations.isEmpty() || !groupBy.isEmpty()) {
// process aggregations separately
return aggregate(ops, fields, sort, groupBy, aggregations, offset, fetch);
}
@@ -171,10 +172,6 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl
List<Map.Entry<String, String>> aggregations,
Long offset, Long fetch) throws IOException {
- if (aggregations.isEmpty()) {
- throw new IllegalArgumentException("Missing Aggregations");
- }
-
if (!groupBy.isEmpty() && offset != null) {
String message = "Currently ES doesn't support generic pagination "
+ "with aggregations. You can still use LIMIT keyword (without OFFSET). "
@@ -245,12 +242,23 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl
}
}
+ final Consumer<JsonNode> emptyAggRemover = new Consumer<JsonNode>() {
+ @Override public void accept(JsonNode node) {
+ if (!node.has(AGGREGATIONS)) {
+ node.elements().forEachRemaining(this);
+ return;
+ }
+ JsonNode agg = node.get(AGGREGATIONS);
+ if (agg.size() == 0) {
+ ((ObjectNode) node).remove(AGGREGATIONS);
+ } else {
+ this.accept(agg);
+ }
+ }
+ };
+
// cleanup query. remove empty AGGREGATIONS element (if empty)
- JsonNode agg = query;
- while (agg.has(AGGREGATIONS) && agg.get(AGGREGATIONS).elements().hasNext()) {
- agg = agg.get(AGGREGATIONS);
- }
- ((ObjectNode) agg).remove(AGGREGATIONS);
+ emptyAggRemover.accept(query);
ElasticsearchJson.Result res = transport.search(Collections.emptyMap()).apply(query);
@@ -258,6 +266,7 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl
if (res.aggregations() != null) {
// collect values
ElasticsearchJson.visitValueNodes(res.aggregations(), m -> {
+ // using 'Collectors.toMap' will trigger Java 8 bug here
Map<String, Object> newMap = new LinkedHashMap<>();
for (String key: m.keySet()) {
newMap.put(fieldMap.getOrDefault(key, key), m.get(key));
http://git-wip-us.apache.org/repos/asf/calcite/blob/96605a86/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
index 73cf6bf..8b09b88 100644
--- a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
+++ b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
@@ -458,62 +458,108 @@ public class ElasticSearchAdapterTest {
@Test
public void groupBy() {
+ // distinct
+ calciteAssert()
+ .query("select distinct state\n"
+ + "from zips\n"
+ + "limit 6")
+ .queryContains(
+ ElasticsearchChecker.elasticsearchChecker("_source:false",
+ "size:0",
+ "aggregations:{'g_state':{'terms':{'field':'state','missing':'__MISSING__', 'size' : 6}}}"))
+ .returnsOrdered("state=AK",
+ "state=AL",
+ "state=AR",
+ "state=AZ",
+ "state=CA",
+ "state=CO");
+
+ // without aggregate function
+ calciteAssert()
+ .query("select state, city\n"
+ + "from zips\n"
+ + "group by state, city\n"
+ + "order by city limit 10")
+ .queryContains(
+ ElasticsearchChecker.elasticsearchChecker("'_source':false",
+ "size:0",
+ "aggregations:{'g_city':{'terms':{'field':'city','missing':'__MISSING__','size':10,'order':{'_key':'asc'}}",
+ "aggregations:{'g_state':{'terms':{'field':'state','missing':'__MISSING__','size':10}}}}}}"))
+ .returnsOrdered("state=SD; city=ABERDEEN",
+ "state=SC; city=AIKEN",
+ "state=TX; city=ALTON",
+ "state=IA; city=AMES",
+ "state=AK; city=ANCHORAGE",
+ "state=MD; city=BALTIMORE",
+ "state=ME; city=BANGOR",
+ "state=KS; city=BAVARIA",
+ "state=NJ; city=BAYONNE",
+ "state=OR; city=BEAVERTON");
+
// ascending
calciteAssert()
- .query("select min(pop), max(pop), state from zips group by state "
+ .query("select min(pop), max(pop), state\n"
+ + "from zips\n"
+ + "group by state\n"
+ "order by state limit 3")
.queryContains(
ElasticsearchChecker.elasticsearchChecker("'_source':false",
- "size:0",
- "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__',size:3,"
- + " order:{'_key':'asc'}}",
- "aggregations:{'EXPR$0':{min:{field:'pop'}},'EXPR$1':{max:{field:'pop'}}}}}"))
+ "size:0",
+ "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__',size:3,"
+ + " order:{'_key':'asc'}}",
+ "aggregations:{'EXPR$0':{min:{field:'pop'}},'EXPR$1':{max:{field:'pop'}}}}}"))
.returnsOrdered("EXPR$0=23238; EXPR$1=32383; state=AK",
"EXPR$0=42124; EXPR$1=44165; state=AL",
"EXPR$0=37428; EXPR$1=53532; state=AR");
// just one aggregation function
calciteAssert()
- .query("select min(pop), state from zips group by state"
- + " order by state limit 3")
+ .query("select min(pop), state\n"
+ + "from zips\n"
+ + "group by state\n"
+ + "order by state limit 3")
.queryContains(
ElasticsearchChecker.elasticsearchChecker("'_source':false",
- "size:0",
- "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__',"
- + "size:3, order:{'_key':'asc'}}",
- "aggregations:{'EXPR$0':{min:{field:'pop'}} }}}"))
+ "size:0",
+ "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__',"
+ + "size:3, order:{'_key':'asc'}}",
+ "aggregations:{'EXPR$0':{min:{field:'pop'}} }}}"))
.returnsOrdered("EXPR$0=23238; state=AK",
"EXPR$0=42124; state=AL",
"EXPR$0=37428; state=AR");
// group by count
calciteAssert()
- .query("select count(city), state from zips group by state "
+ .query("select count(city), state\n"
+ + "from zips\n"
+ + "group by state\n"
+ "order by state limit 3")
.queryContains(
ElasticsearchChecker.elasticsearchChecker("'_source':false",
- "size:0",
- "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__',"
- + " size:3, order:{'_key':'asc'}}",
- "aggregations:{'EXPR$0':{'value_count':{field:'city'}} }}}"))
+ "size:0",
+ "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__',"
+ + " size:3, order:{'_key':'asc'}}",
+ "aggregations:{'EXPR$0':{'value_count':{field:'city'}} }}}"))
.returnsOrdered("EXPR$0=3; state=AK",
"EXPR$0=3; state=AL",
"EXPR$0=3; state=AR");
// descending
calciteAssert()
- .query("select min(pop), max(pop), state from zips group by state "
- + " order by state desc limit 3")
+ .query("select min(pop), max(pop), state\n"
+ + "from zips\n"
+ + "group by state\n"
+ + "order by state desc limit 3")
.queryContains(
ElasticsearchChecker.elasticsearchChecker("'_source':false",
- "size:0",
- "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__',"
- + "size:3, order:{'_key':'desc'}}",
- "aggregations:{'EXPR$0':{min:{field:'pop'}},'EXPR$1':"
- + "{max:{field:'pop'}}}}}"))
+ "size:0",
+ "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__',"
+ + "size:3, order:{'_key':'desc'}}",
+ "aggregations:{'EXPR$0':{min:{field:'pop'}},'EXPR$1':"
+ + "{max:{field:'pop'}}}}}"))
.returnsOrdered("EXPR$0=25968; EXPR$1=33107; state=WY",
- "EXPR$0=45196; EXPR$1=70185; state=WV",
- "EXPR$0=51008; EXPR$1=57187; state=WI");
+ "EXPR$0=45196; EXPR$1=70185; state=WV",
+ "EXPR$0=51008; EXPR$1=57187; state=WI");
}
/**