You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2018/12/02 00:55:04 UTC

[12/15] calcite git commit: [CALCITE-2679] In Elasticsearch adapter, implement DISTINCT and GROUP BY without aggregate function (Siyuan Liu)

[CALCITE-2679] In Elasticsearch adapter, implement DISTINCT and GROUP BY without aggregate function (Siyuan Liu)

This commit mainly fixed 3 bugs:
1. Group by and distinct query enter the wrong execution branch.
   [ElasticsearchTable:126]
2. Values in agg bucket loses a part after returning as a result.
   [ElasticsearchJson:83-93, 546-551]
3. Logic of removing empty agg blocks can not work. [Elasticsearch:240-254]

Close apache/calcite#927


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/96605a86
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/96605a86
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/96605a86

Branch: refs/heads/master
Commit: 96605a86cfc4586951700727f12d33fc00c1ce55
Parents: e809344
Author: liusiyuan1 <li...@360.cn>
Authored: Sun Nov 18 23:28:29 2018 +0800
Committer: Julian Hyde <jh...@apache.org>
Committed: Sat Dec 1 14:42:33 2018 -0800

----------------------------------------------------------------------
 .../elasticsearch/ElasticsearchJson.java        | 42 ++++++---
 .../elasticsearch/ElasticsearchTable.java       | 29 ++++--
 .../elasticsearch/ElasticSearchAdapterTest.java | 96 +++++++++++++++-----
 3 files changed, 120 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/96605a86/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
index e389ecf..dd49dfa 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
@@ -27,15 +27,14 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.ImmutableSet;
 
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Deque;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -74,9 +73,17 @@ final class ElasticsearchJson {
         rows.computeIfAbsent(r, ignore -> new ArrayList<>()).add(v);
     aggregations.forEach(a -> visitValueNodes(a, new ArrayList<>(), cons));
     rows.forEach((k, v) -> {
-      Map<String, Object> row = new LinkedHashMap<>(k.keys);
-      v.forEach(val -> row.put(val.getName(), val.value()));
-      consumer.accept(row);
+      if (v.stream().anyMatch(val -> val instanceof GroupValue)) {
+        v.forEach(tuple -> {
+          Map<String, Object> groupRow = new LinkedHashMap<>(k.keys);
+          groupRow.put(tuple.getName(), tuple.value());
+          consumer.accept(groupRow);
+        });
+      } else {
+        Map<String, Object> row = new LinkedHashMap<>(k.keys);
+        v.forEach(val -> row.put(val.getName(), val.value()));
+        consumer.accept(row);
+      }
     });
   }
 
@@ -178,7 +185,7 @@ final class ElasticsearchJson {
       Bucket bucket = (Bucket) aggregation;
       if (bucket.hasNoAggregations()) {
         // bucket with no aggregations is also considered a leaf node
-        visitValueNodes(MultiValue.of(bucket.getName(), bucket.key()), parents, consumer);
+        visitValueNodes(GroupValue.of(bucket.getName(), bucket.key()), parents, consumer);
         return;
       }
       parents.add(bucket);
@@ -561,13 +568,23 @@ final class ElasticsearchJson {
       return values().get("value");
     }
 
+  }
+
+  /**
+   * Distinguishes from {@link MultiValue}.
+   * In order that rows which have the same key can be put into result map.
+   */
+  static class GroupValue extends MultiValue {
+    GroupValue(String name, Map<String, Object> values) {
+      super(name, values);
+    }
+
     /**
-     * Constructs a {@link MultiValue} instance with a single value.
+     * Constructs a {@link GroupValue} instance with a single value.
      */
-    static MultiValue of(String name, Object value) {
-      return new MultiValue(name, Collections.singletonMap("value", value));
+    static GroupValue of(String name, Object value) {
+      return new GroupValue(name, Collections.singletonMap("value", value));
     }
-
   }
 
   /**
@@ -575,8 +592,9 @@ final class ElasticsearchJson {
    */
   static class AggregationsDeserializer extends StdDeserializer<Aggregations> {
 
-    private static final Set<String> IGNORE_TOKENS = new HashSet<>(Arrays.asList("meta",
-        "buckets", "value", "values", "value_as_string", "doc_count", "key", "key_as_string"));
+    private static final Set<String> IGNORE_TOKENS =
+        ImmutableSet.of("meta", "buckets", "value", "values", "value_as_string",
+            "doc_count", "key", "key_as_string");
 
     AggregationsDeserializer() {
       super(Aggregations.class);

http://git-wip-us.apache.org/repos/asf/calcite/blob/96605a86/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
index b009fff..e288a16 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
@@ -53,6 +53,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -122,7 +123,7 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl
       List<Map.Entry<String, String>> aggregations,
       Long offset, Long fetch) throws IOException {
 
-    if (!aggregations.isEmpty()) {
+    if (!aggregations.isEmpty() || !groupBy.isEmpty()) {
       // process aggregations separately
       return aggregate(ops, fields, sort, groupBy, aggregations, offset, fetch);
     }
@@ -171,10 +172,6 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl
       List<Map.Entry<String, String>> aggregations,
       Long offset, Long fetch) throws IOException {
 
-    if (aggregations.isEmpty()) {
-      throw new IllegalArgumentException("Missing Aggregations");
-    }
-
     if (!groupBy.isEmpty() && offset != null) {
       String message = "Currently ES doesn't support generic pagination "
           + "with aggregations. You can still use LIMIT keyword (without OFFSET). "
@@ -245,12 +242,23 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl
       }
     }
 
+    final Consumer<JsonNode> emptyAggRemover = new Consumer<JsonNode>() {
+      @Override public void accept(JsonNode node) {
+        if (!node.has(AGGREGATIONS)) {
+          node.elements().forEachRemaining(this);
+          return;
+        }
+        JsonNode agg = node.get(AGGREGATIONS);
+        if (agg.size() == 0) {
+          ((ObjectNode) node).remove(AGGREGATIONS);
+        } else {
+          this.accept(agg);
+        }
+      }
+    };
+
     // cleanup query. remove empty AGGREGATIONS element (if empty)
-    JsonNode agg = query;
-    while (agg.has(AGGREGATIONS) && agg.get(AGGREGATIONS).elements().hasNext()) {
-      agg = agg.get(AGGREGATIONS);
-    }
-    ((ObjectNode) agg).remove(AGGREGATIONS);
+    emptyAggRemover.accept(query);
 
     ElasticsearchJson.Result res = transport.search(Collections.emptyMap()).apply(query);
 
@@ -258,6 +266,7 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl
     if (res.aggregations() != null) {
       // collect values
       ElasticsearchJson.visitValueNodes(res.aggregations(), m -> {
+        // using 'Collectors.toMap' will trigger Java 8 bug here
         Map<String, Object> newMap = new LinkedHashMap<>();
         for (String key: m.keySet()) {
           newMap.put(fieldMap.getOrDefault(key, key), m.get(key));

http://git-wip-us.apache.org/repos/asf/calcite/blob/96605a86/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
index 73cf6bf..8b09b88 100644
--- a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
+++ b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
@@ -458,62 +458,108 @@ public class ElasticSearchAdapterTest {
 
   @Test
   public void groupBy() {
+    // distinct
+    calciteAssert()
+        .query("select distinct state\n"
+            + "from zips\n"
+            + "limit 6")
+        .queryContains(
+            ElasticsearchChecker.elasticsearchChecker("_source:false",
+                "size:0",
+                "aggregations:{'g_state':{'terms':{'field':'state','missing':'__MISSING__', 'size' : 6}}}"))
+        .returnsOrdered("state=AK",
+            "state=AL",
+            "state=AR",
+            "state=AZ",
+            "state=CA",
+            "state=CO");
+
+    // without aggregate function
+    calciteAssert()
+        .query("select state, city\n"
+            + "from zips\n"
+            + "group by state, city\n"
+            + "order by city limit 10")
+        .queryContains(
+            ElasticsearchChecker.elasticsearchChecker("'_source':false",
+                "size:0",
+                "aggregations:{'g_city':{'terms':{'field':'city','missing':'__MISSING__','size':10,'order':{'_key':'asc'}}",
+                "aggregations:{'g_state':{'terms':{'field':'state','missing':'__MISSING__','size':10}}}}}}"))
+        .returnsOrdered("state=SD; city=ABERDEEN",
+            "state=SC; city=AIKEN",
+            "state=TX; city=ALTON",
+            "state=IA; city=AMES",
+            "state=AK; city=ANCHORAGE",
+            "state=MD; city=BALTIMORE",
+            "state=ME; city=BANGOR",
+            "state=KS; city=BAVARIA",
+            "state=NJ; city=BAYONNE",
+            "state=OR; city=BEAVERTON");
+
     // ascending
     calciteAssert()
-        .query("select min(pop), max(pop), state from zips group by state "
+        .query("select min(pop), max(pop), state\n"
+            + "from zips\n"
+            + "group by state\n"
             + "order by state limit 3")
         .queryContains(
             ElasticsearchChecker.elasticsearchChecker("'_source':false",
-            "size:0",
-            "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__',size:3,"
-                + " order:{'_key':'asc'}}",
-            "aggregations:{'EXPR$0':{min:{field:'pop'}},'EXPR$1':{max:{field:'pop'}}}}}"))
+                "size:0",
+                "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__',size:3,"
+                    + " order:{'_key':'asc'}}",
+                "aggregations:{'EXPR$0':{min:{field:'pop'}},'EXPR$1':{max:{field:'pop'}}}}}"))
         .returnsOrdered("EXPR$0=23238; EXPR$1=32383; state=AK",
             "EXPR$0=42124; EXPR$1=44165; state=AL",
             "EXPR$0=37428; EXPR$1=53532; state=AR");
 
     // just one aggregation function
     calciteAssert()
-        .query("select min(pop), state from zips group by state"
-            + " order by state limit 3")
+        .query("select min(pop), state\n"
+            + "from zips\n"
+            + "group by state\n"
+            + "order by state limit 3")
         .queryContains(
             ElasticsearchChecker.elasticsearchChecker("'_source':false",
-            "size:0",
-            "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__',"
-                + "size:3, order:{'_key':'asc'}}",
-            "aggregations:{'EXPR$0':{min:{field:'pop'}} }}}"))
+                "size:0",
+                "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__',"
+                    + "size:3, order:{'_key':'asc'}}",
+                "aggregations:{'EXPR$0':{min:{field:'pop'}} }}}"))
         .returnsOrdered("EXPR$0=23238; state=AK",
             "EXPR$0=42124; state=AL",
             "EXPR$0=37428; state=AR");
 
     // group by count
     calciteAssert()
-        .query("select count(city), state from zips group by state "
+        .query("select count(city), state\n"
+            + "from zips\n"
+            + "group by state\n"
             + "order by state limit 3")
         .queryContains(
             ElasticsearchChecker.elasticsearchChecker("'_source':false",
-            "size:0",
-            "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__',"
-                + " size:3, order:{'_key':'asc'}}",
-            "aggregations:{'EXPR$0':{'value_count':{field:'city'}} }}}"))
+                "size:0",
+                "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__',"
+                    + " size:3, order:{'_key':'asc'}}",
+                "aggregations:{'EXPR$0':{'value_count':{field:'city'}} }}}"))
         .returnsOrdered("EXPR$0=3; state=AK",
             "EXPR$0=3; state=AL",
             "EXPR$0=3; state=AR");
 
     // descending
     calciteAssert()
-        .query("select min(pop), max(pop), state from zips group by state "
-            + " order by state desc limit 3")
+        .query("select min(pop), max(pop), state\n"
+            + "from zips\n"
+            + "group by state\n"
+            + "order by state desc limit 3")
         .queryContains(
             ElasticsearchChecker.elasticsearchChecker("'_source':false",
-            "size:0",
-            "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__',"
-                + "size:3, order:{'_key':'desc'}}",
-            "aggregations:{'EXPR$0':{min:{field:'pop'}},'EXPR$1':"
-                + "{max:{field:'pop'}}}}}"))
+                "size:0",
+                "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__',"
+                    + "size:3, order:{'_key':'desc'}}",
+                "aggregations:{'EXPR$0':{min:{field:'pop'}},'EXPR$1':"
+                    + "{max:{field:'pop'}}}}}"))
         .returnsOrdered("EXPR$0=25968; EXPR$1=33107; state=WY",
-                        "EXPR$0=45196; EXPR$1=70185; state=WV",
-                        "EXPR$0=51008; EXPR$1=57187; state=WI");
+            "EXPR$0=45196; EXPR$1=70185; state=WV",
+            "EXPR$0=51008; EXPR$1=57187; state=WI");
   }
 
   /**