You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2018/12/02 00:54:54 UTC

[02/15] calcite git commit: [CALCITE-2689] In ElasticSearch adapter, allow grouping on non-textual fields like date and number

[CALCITE-2689] In ElasticSearch adapter, allow grouping on non-textual fields like date and number

Consider field type when populating `missing` (value) in Elasticsearch
terms aggregations.

Close apache/calcite#946


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/ed3da62d
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/ed3da62d
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/ed3da62d

Branch: refs/heads/master
Commit: ed3da62d75ead6c00bb73470c3436e51f6f77197
Parents: 08aefb0
Author: Andrei Sereda <25...@users.noreply.github.com>
Authored: Tue Nov 20 00:10:47 2018 -0500
Committer: Julian Hyde <jh...@apache.org>
Committed: Fri Nov 30 17:55:09 2018 -0800

----------------------------------------------------------------------
 elasticsearch/pom.xml                           |   5 +
 .../elasticsearch/ElasticsearchJson.java        |  67 +++++--
 .../elasticsearch/ElasticsearchMapping.java     | 188 +++++++++++++++++++
 .../elasticsearch/ElasticsearchTable.java       |  13 +-
 .../elasticsearch/ElasticsearchTransport.java   |  17 ++
 .../adapter/elasticsearch/Scrolling.java        |   1 -
 .../adapter/elasticsearch/AggregationTest.java  |  77 ++++++--
 .../adapter/elasticsearch/BooleanLogicTest.java |   7 +-
 .../adapter/elasticsearch/ScrollingTest.java    |   1 -
 9 files changed, 338 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/ed3da62d/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml
index ec9a0e0..86248ba 100644
--- a/elasticsearch/pom.xml
+++ b/elasticsearch/pom.xml
@@ -68,6 +68,11 @@ limitations under the License.
       <artifactId>jackson-annotations</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/calcite/blob/ed3da62d/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
index 8a6b011..e389ecf 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
@@ -26,14 +26,15 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -50,7 +51,7 @@ import java.util.stream.StreamSupport;
 import static java.util.Collections.unmodifiableMap;
 
 /**
- * Internal objects (and deserializers) used to parse elastic search results
+ * Internal objects (and deserializers) used to parse Elasticsearch results
  * (which are in JSON format).
  *
  * <p>Since we're using basic row-level rest client http response has to be
@@ -58,13 +59,6 @@ import static java.util.Collections.unmodifiableMap;
  */
 final class ElasticsearchJson {
 
-  /**
-   * Used as special aggregation key for missing values (documents which are missing a field).
-   * Buckets with that value are then converted to {@code null}s in flat tabular format.
-   * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-sum-aggregation.html">Missing Value</a>
-   */
-  static final JsonNode MISSING_VALUE = JsonNodeFactory.instance.textNode("__MISSING__");
-
   private ElasticsearchJson() {}
 
   /**
@@ -87,6 +81,50 @@ final class ElasticsearchJson {
   }
 
   /**
+   * Visits Elasticsearch
+   * <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html">mapping
+   * properties</a> and calls consumer for each {@code field / type} pair.
+   * Nested fields are represented as {@code foo.bar.qux}.
+   */
+  static void visitMappingProperties(ObjectNode mapping,
+      BiConsumer<String, String> consumer) {
+    Objects.requireNonNull(mapping, "mapping");
+    Objects.requireNonNull(consumer, "consumer");
+    visitMappingProperties(new ArrayDeque<>(), mapping, consumer);
+  }
+
+  private static void visitMappingProperties(Deque<String> path,
+      ObjectNode mapping, BiConsumer<String, String> consumer) {
+    Objects.requireNonNull(mapping, "mapping");
+    if (mapping.isMissingNode()) {
+      return;
+    }
+
+    if (mapping.has("properties")) {
+      // recurse
+      visitMappingProperties(path, (ObjectNode) mapping.get("properties"), consumer);
+      return;
+    }
+
+    if (mapping.has("type")) {
+      // this is leaf (register field / type mapping)
+      consumer.accept(String.join(".", path), mapping.get("type").asText());
+      return;
+    }
+
+    // otherwise continue visiting mapping(s)
+    Iterable<Map.Entry<String, JsonNode>> iter = mapping::fields;
+    for (Map.Entry<String, JsonNode> entry : iter) {
+      final String name = entry.getKey();
+      final ObjectNode node = (ObjectNode) entry.getValue();
+      path.add(name);
+      visitMappingProperties(path, node, consumer);
+      path.removeLast();
+    }
+  }
+
+
+  /**
    * Identifies a calcite row (as in relational algebra)
    */
   private static class RowKey {
@@ -601,19 +639,24 @@ final class ElasticsearchJson {
      * Determines if current key is a missing field key. Missing key is returned when document
      * does not have pivoting attribute (example {@code GROUP BY _MAP['a.b.missing']}). It helps
      * grouping documents which don't have a field. In relational algebra this
-     * would be {@code null}.
+     * would normally be {@code null}.
+     *
+     * <p>Please note that missing value is different for each type.
      *
      * @param key current {@code key} (usually string) as returned by ES
      * @return {@code true} if this value
-     * @see #MISSING_VALUE
      */
     private static boolean isMissingBucket(JsonNode key) {
-      return MISSING_VALUE.equals(key);
+      return ElasticsearchMapping.Datatype.isMissingValue(key);
     }
 
     private static Bucket parseBucket(JsonParser parser, String name, ObjectNode node)
         throws JsonProcessingException  {
 
+      if (!node.has("key")) {
+        throw new IllegalArgumentException("No 'key' attribute for " + node);
+      }
+
       final JsonNode keyNode = node.get("key");
       final Object key;
       if (isMissingBucket(keyNode) || keyNode.isNull()) {

http://git-wip-us.apache.org/repos/asf/calcite/blob/ed3da62d/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMapping.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMapping.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMapping.java
new file mode 100644
index 0000000..93a8049
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMapping.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.google.common.collect.ImmutableMap;
+
+import java.time.LocalDate;
+import java.time.ZoneOffset;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+
+/**
+ * Stores Elasticsearch
+ * <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html">
+ * mapping</a> information for particular index/type. This information is
+ * extracted from {@code /$index/$type/_mapping} endpoint.
+ *
+ * <p>Instances of this class are immutable.
+ */
+class ElasticsearchMapping {
+
+  private final String index;
+
+  private final String type;
+
+  private final Map<String, Datatype> mapping;
+
+  ElasticsearchMapping(final String index, final String type,
+      final Map<String, String> mapping) {
+    this.index = Objects.requireNonNull(index, "index");
+    this.type = Objects.requireNonNull(type, "type");
+    Objects.requireNonNull(mapping, "mapping");
+
+    final Map<String, Datatype> transformed = mapping.entrySet().stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, e -> new Datatype(e.getValue())));
+    this.mapping = ImmutableMap.copyOf(transformed);
+  }
+
+  /**
+   * Returns ES schema for each field. Mapping is represented as field name
+   * {@code foo.bar.qux} and type ({@code keyword}, {@code boolean},
+   * {@code long}).
+   *
+   * @return immutable mapping between field and ES type
+   *
+   * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html">Mapping Types</a>
+   */
+  Map<String, Datatype> mapping() {
+    return this.mapping;
+  }
+
+  /**
+   * Used as special aggregation key for missing values (documents that are
+   * missing a field).
+   *
+   * <p>Buckets with that value are then converted to {@code null}s in flat
+   * tabular format.
+   *
+   * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-sum-aggregation.html">Missing Value</a>
+   */
+  Optional<JsonNode> missingValueFor(String fieldName) {
+    if (!mapping().containsKey(fieldName)) {
+      final String message = String.format(Locale.ROOT,
+          "Field %s not defined for %s/%s", fieldName, index, type);
+      throw new IllegalArgumentException(message);
+    }
+
+    return mapping().get(fieldName).missingValue();
+  }
+
+  String index() {
+    return this.index;
+  }
+
+  String type() {
+    return this.type;
+  }
+
+  /**
+   * Represents elastic data-type, like {@code long}, {@code keyword},
+   * {@code date} etc.
+   *
+   * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html">Mapping Types</a>
+   */
+  static class Datatype {
+    private static final JsonNodeFactory FACTORY = JsonNodeFactory.instance;
+
+    // pre-cache missing values
+    private static final Set<JsonNode> MISSING_VALUES =
+        Stream.of("string", // for ES2
+            "text", "keyword",
+            "date", "long", "integer", "double", "float")
+            .map(Datatype::missingValueForType)
+            .collect(Collectors.toSet());
+
+    private final String name;
+    private final JsonNode missingValue;
+
+    private Datatype(final String name) {
+      this.name = Objects.requireNonNull(name, "name");
+      this.missingValue = missingValueForType(name);
+    }
+
+    /**
+     * Mapping between ES type and json value that represents
+     * {@code missing value} during aggregations. This value can't be
+     * {@code null} and should match type or the field (for ES long type it
+     * also has to be json integer, for date it has to match date format or be
+     * integer (millis epoch) etc.
+     *
+     * <p>It is used for terms aggregations to represent SQL {@code null}.
+     *
+     * @param name name of the type ({@code long}, {@code keyword} ...)
+     *
+     * @return json that will be used in elastic search terms aggregation for
+     * missing value
+     *
+     * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-terms-aggregation.html#_missing_value_13">Missing Value</a>
+     */
+    private static @Nullable JsonNode missingValueForType(String name) {
+      switch (name) {
+      case "string": // for ES2
+      case "text":
+      case "keyword":
+        return FACTORY.textNode("__MISSING__");
+      case "long":
+        return FACTORY.numberNode(Long.MIN_VALUE);
+      case "integer":
+        return FACTORY.numberNode(Integer.MIN_VALUE);
+      case "short":
+        return FACTORY.numberNode(Short.MIN_VALUE);
+      case "double":
+        return FACTORY.numberNode(Double.MIN_VALUE);
+      case "float":
+        return FACTORY.numberNode(Float.MIN_VALUE);
+      case "date":
+        // sentinel for missing dates: 9999-12-31
+        final long millisEpoch = LocalDate.of(9999, 12, 31)
+            .atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli();
+        // by default elastic returns dates as longs
+        return FACTORY.numberNode(millisEpoch);
+      }
+
+      // this is unknown type
+      return null;
+    }
+
+    /**
+     * Name of the type: {@code text}, {@code integer}, {@code float} etc.
+     */
+    String name() {
+      return this.name;
+    }
+
+    Optional<JsonNode> missingValue() {
+      return Optional.ofNullable(missingValue);
+    }
+
+    static boolean isMissingValue(JsonNode node) {
+      return MISSING_VALUES.contains(node);
+    }
+  }
+
+}
+
+// End ElasticsearchMapping.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/ed3da62d/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
index f9565ec..b009fff 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
@@ -219,15 +219,20 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl
       final ObjectNode section = parent.with(aggName);
       final ObjectNode terms = section.with("terms");
       terms.put("field", name);
-      terms.set("missing", ElasticsearchJson.MISSING_VALUE); // expose missing terms
+
+      transport.mapping.missingValueFor(name).ifPresent(m -> {
+        // expose missing terms. each type has a different missing value
+        terms.set("missing", m);
+      });
 
       if (fetch != null) {
         terms.put("size", fetch);
       }
 
-      sort.stream().filter(e -> e.getKey().equals(name)).findAny().ifPresent(s -> {
-        terms.with("order").put("_key", s.getValue().isDescending() ? "desc" : "asc");
-      });
+      sort.stream().filter(e -> e.getKey().equals(name)).findAny()
+          .ifPresent(s ->
+              terms.with("order")
+                  .put("_key", s.getValue().isDescending() ? "desc" : "asc"));
 
       parent = section.with(AGGREGATIONS);
     }

http://git-wip-us.apache.org/repos/asf/calcite/blob/ed3da62d/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTransport.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTransport.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTransport.java
index 12173d8..0c9c06a 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTransport.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTransport.java
@@ -36,6 +36,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.databind.node.TextNode;
+import com.google.common.collect.ImmutableMap;
 
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
@@ -72,6 +73,8 @@ final class ElasticsearchTransport {
 
   final ElasticsearchVersion version;
 
+  final ElasticsearchMapping mapping;
+
   /**
    * Default batch size
    * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html">Scrolling API</a>
@@ -89,6 +92,7 @@ final class ElasticsearchTransport {
     this.typeName = Objects.requireNonNull(typeName, "typeName");
     this.fetchSize = fetchSize;
     this.version = version(); // cache version
+    this.mapping = fetchAndCreateMapping(); // cache mapping
   }
 
   RestClient restClient() {
@@ -112,6 +116,19 @@ final class ElasticsearchTransport {
         .apply(request);
   }
 
+  /**
+   * Build index mapping returning new instance of {@link ElasticsearchMapping}.
+   */
+  private ElasticsearchMapping fetchAndCreateMapping() {
+    final String uri = String.format(Locale.ROOT, "/%s/%s/_mapping", indexName, typeName);
+    final ObjectNode root = rawHttp(ObjectNode.class).apply(new HttpGet(uri));
+    ObjectNode properties = (ObjectNode) root.elements().next().get("mappings").elements().next();
+
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+    ElasticsearchJson.visitMappingProperties(properties, builder::put);
+    return new ElasticsearchMapping(indexName, typeName, builder.build());
+  }
+
   ObjectMapper mapper() {
     return mapper;
   }

http://git-wip-us.apache.org/repos/asf/calcite/blob/ed3da62d/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/Scrolling.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/Scrolling.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/Scrolling.java
index f947ae5..85850d8 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/Scrolling.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/Scrolling.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.calcite.adapter.elasticsearch;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;

http://git-wip-us.apache.org/repos/asf/calcite/blob/ed3da62d/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
index d06cc7d..0586e88 100644
--- a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
+++ b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
@@ -23,6 +23,8 @@ import org.apache.calcite.schema.impl.ViewTableMacro;
 import org.apache.calcite.test.CalciteAssert;
 import org.apache.calcite.test.ElasticsearchChecker;
 
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.ImmutableMap;
 
@@ -41,7 +43,7 @@ import java.util.Locale;
 import java.util.Map;
 
 /**
- * Testing Elastic Search aggregation transformations.
+ * Testing Elasticsearch aggregation transformations.
  */
 public class AggregationTest {
 
@@ -53,20 +55,29 @@ public class AggregationTest {
   @BeforeClass
   public static void setupInstance() throws Exception {
 
-    final Map<String, String> mappings = ImmutableMap.of("cat1", "keyword",
-        "cat2", "keyword", "cat3", "keyword",
-        "val1", "long", "val2", "long");
+    final Map<String, String> mappings = ImmutableMap.<String, String>builder()
+        .put("cat1", "keyword")
+        .put("cat2", "keyword")
+        .put("cat3", "keyword")
+        .put("cat4", "date")
+        .put("cat5", "integer")
+        .put("val1", "long")
+        .put("val2", "long")
+        .build();
 
     NODE.createIndex(NAME, mappings);
 
-    String doc1 = "{'cat1': 'a', 'cat2': 'g', 'val1': 1 }".replace('\'', '"');
-    String doc2 = "{'cat2': 'g', 'cat3': 'y', 'val2': 5 }".replace('\'', '"');
-    String doc3 = "{'cat1': 'b', 'cat2':'h', 'cat3': 'z', 'val1': 7, 'val2': '42'}"
-        .replace('\'', '"');
+    String doc1 = "{cat1:'a', cat2:'g', val1:1, cat4:'2018-01-01', cat5:1}";
+    String doc2 = "{cat2:'g', cat3:'y', val2:5, cat4:'2019-12-12'}";
+    String doc3 = "{cat1:'b', cat2:'h', cat3:'z', cat5:2, val1:7, val2:42}";
 
-    List<ObjectNode> docs = new ArrayList<>();
+    final ObjectMapper mapper = new ObjectMapper()
+        .enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES) // user-friendly settings to
+        .enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES); // avoid too much quoting
+
+    final List<ObjectNode> docs = new ArrayList<>();
     for (String text: Arrays.asList(doc1, doc2, doc3)) {
-      docs.add((ObjectNode) NODE.mapper().readTree(text));
+      docs.add((ObjectNode) mapper.readTree(text));
     }
 
     NODE.insertBulk(NAME, docs);
@@ -85,6 +96,8 @@ public class AggregationTest {
             "select _MAP['cat1'] AS \"cat1\", "
                 + " _MAP['cat2']  AS \"cat2\", "
                 +  " _MAP['cat3'] AS \"cat3\", "
+                +  " _MAP['cat4'] AS \"cat4\", "
+                +  " _MAP['cat5'] AS \"cat5\", "
                 +  " _MAP['val1'] AS \"val1\", "
                 +  " _MAP['val2'] AS \"val2\" "
                 +  " from \"elastic\".\"%s\"", NAME);
@@ -118,7 +131,7 @@ public class AggregationTest {
   }
 
   @Test
-  public void all() throws Exception {
+  public void all() {
     CalciteAssert.that()
         .with(newConnectionFactory())
         .query("select count(*), sum(val1), sum(val2) from view")
@@ -141,7 +154,7 @@ public class AggregationTest {
   }
 
   @Test
-  public void cat1() throws Exception {
+  public void cat1() {
     CalciteAssert.that()
         .with(newConnectionFactory())
         .query("select cat1, sum(val1), sum(val2) from view group by cat1")
@@ -173,7 +186,7 @@ public class AggregationTest {
   }
 
   @Test
-  public void cat2() throws Exception {
+  public void cat2() {
     CalciteAssert.that()
         .with(newConnectionFactory())
         .query("select cat2, min(val1), max(val1), min(val2), max(val2) from view group by cat2")
@@ -194,7 +207,7 @@ public class AggregationTest {
   }
 
   @Test
-  public void cat1Cat2() throws Exception {
+  public void cat1Cat2() {
     CalciteAssert.that()
         .with(newConnectionFactory())
         .query("select cat1, cat2, sum(val1), sum(val2) from view group by cat1, cat2")
@@ -211,7 +224,7 @@ public class AggregationTest {
   }
 
   @Test
-  public void cat1Cat3() throws Exception {
+  public void cat1Cat3() {
     CalciteAssert.that()
         .with(newConnectionFactory())
         .query("select cat1, cat3, sum(val1), sum(val2) from view group by cat1, cat3")
@@ -224,7 +237,7 @@ public class AggregationTest {
    * Testing {@link org.apache.calcite.sql.SqlKind#ANY_VALUE} aggregate function
    */
   @Test
-  public void anyValue() throws Exception {
+  public void anyValue() {
     CalciteAssert.that()
         .with(newConnectionFactory())
         .query("select cat1, any_value(cat2) from view group by cat1")
@@ -247,7 +260,7 @@ public class AggregationTest {
   }
 
   @Test
-  public void cat1Cat2Cat3() throws Exception {
+  public void cat1Cat2Cat3() {
     CalciteAssert.that()
             .with(newConnectionFactory())
             .query("select cat1, cat2, cat3, count(*), sum(val1), sum(val2) from view "
@@ -256,6 +269,36 @@ public class AggregationTest {
                     "cat1=b; cat2=h; cat3=z; EXPR$3=1; EXPR$4=7.0; EXPR$5=42.0",
                     "cat1=null; cat2=g; cat3=y; EXPR$3=1; EXPR$4=0.0; EXPR$5=5.0");
   }
+
+  /**
+   * Group by
+   * <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/date.html">
+   * date</a> data type.
+   */
+  @Test
+  public void dateCat() {
+    CalciteAssert.that()
+        .with(newConnectionFactory())
+        .query("select cat4, sum(val1) from view group by cat4")
+        .returnsUnordered("cat4=1514764800000; EXPR$1=1.0",
+            "cat4=1576108800000; EXPR$1=0.0",
+            "cat4=null; EXPR$1=7.0");
+  }
+
+  /**
+   * Group by
+   * <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/number.html">
+   * number</a> data type.
+   */
+  @Test
+  public void integerCat() {
+    CalciteAssert.that()
+        .with(newConnectionFactory())
+        .query("select cat5, sum(val1) from view group by cat5")
+        .returnsUnordered("cat5=1; EXPR$1=1.0",
+            "cat5=null; EXPR$1=0.0",
+            "cat5=2; EXPR$1=7.0");
+  }
 }
 
 // End AggregationTest.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/ed3da62d/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/BooleanLogicTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/BooleanLogicTest.java b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/BooleanLogicTest.java
index c870234..c3a233a 100644
--- a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/BooleanLogicTest.java
+++ b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/BooleanLogicTest.java
@@ -54,7 +54,7 @@ public class BooleanLogicTest {
   @BeforeClass
   public static void setupInstance() throws Exception {
 
-    final Map<String, String> mapping = ImmutableMap.of("A", "keyword", "b", "keyword",
+    final Map<String, String> mapping = ImmutableMap.of("a", "keyword", "b", "keyword",
         "c", "keyword", "int", "long");
 
     NODE.createIndex(NAME, mapping);
@@ -80,7 +80,8 @@ public class BooleanLogicTest {
                 +  " from \"elastic\".\"%s\"", NAME);
 
         ViewTableMacro macro = ViewTable.viewMacro(root, viewSql,
-                Collections.singletonList("elastic"), Arrays.asList("elastic", "view"), false);
+            Collections.singletonList("elastic"),
+            Arrays.asList("elastic", "view"), false);
         root.add("VIEW", macro);
 
         return connection;
@@ -131,7 +132,7 @@ public class BooleanLogicTest {
     assertSingle("select * from view where c = 'c' and (a in ('a', 'b') or num in (41, 42))");
     assertSingle("select * from view where (a = 'a' or b = 'b') or (num = 42 and c = 'c')");
     assertSingle("select * from view where a = 'a' and (b = '0' or (b = 'b' and "
-            +  "(c = '0' or (c = 'c' and num = 42))))");
+        +  "(c = '0' or (c = 'c' and num = 42))))");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/calcite/blob/ed3da62d/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ScrollingTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ScrollingTest.java b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ScrollingTest.java
index 5beec12..b792e20 100644
--- a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ScrollingTest.java
+++ b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ScrollingTest.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.calcite.adapter.elasticsearch;
 
 import org.apache.calcite.jdbc.CalciteConnection;