You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@calcite.apache.org by Julian Hyde <jh...@gmail.com> on 2018/09/19 14:34:28 UTC
Re: [2/2] calcite git commit: [CALCITE-2528] Support Aggregates in ElasticSearch adapter (Andrei Sereda)
Thanks Andrei. One thing: now you’re a committer your commit messages should not (must not) end with “(Andrei Sereda)”.
Julian
> On Sep 18, 2018, at 7:57 PM, sereda@apache.org wrote:
>
> [CALCITE-2528] Support Aggregates in ElasticSearch adapter (Andrei Sereda)
>
> Aggregate functions (count/sum/min/max/avg) are pushed down to ES.
>
> Add ElasticsearchAggregate relational expression to convert SQL into native Elastic aggregations (value_count, min, max etc.).
> Enhance ElasticsearchTable to prepare correct aggregate ES JSON query.
>
> Create special classes to parse recursively elastic aggregation response or buckets (located in ElasticJson). They're inspired from existing Elastic high-level client source.
>
> For tests, make Json input more human friendly. Single quotes are accepted and fields can be unquoted (unless
> they contain special characters). Also field with dots 'a.b.c' are automatically auto-expanded. This reduces JSON noise.
>
> Fix single projections which previously returned map (see [CALCITE-2485])
>
> Close apache/calcite#801
> Close apache/calcite#822
>
>
> Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
> Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/79af1c9b
> Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/79af1c9b
> Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/79af1c9b
>
> Branch: refs/heads/master
> Commit: 79af1c9ba735286653697deed3ff849b7c921fe4
> Parents: ce05146
> Author: Andrei Sereda <25...@users.noreply.github.com>
> Authored: Tue Sep 18 22:53:24 2018 -0400
> Committer: Andrei Sereda <25...@users.noreply.github.com>
> Committed: Tue Sep 18 22:53:24 2018 -0400
>
> ----------------------------------------------------------------------
> elasticsearch/pom.xml | 6 +
> .../AbstractElasticsearchTable.java | 150 -----
> .../elasticsearch/ElasticsearchAggregate.java | 165 +++++
> .../elasticsearch/ElasticsearchConstants.java | 9 -
> .../elasticsearch/ElasticsearchEnumerators.java | 44 +-
> .../elasticsearch/ElasticsearchFilter.java | 17 +-
> .../elasticsearch/ElasticsearchJson.java | 614 +++++++++++++++++++
> .../elasticsearch/ElasticsearchMethod.java | 13 +-
> .../elasticsearch/ElasticsearchProject.java | 6 +-
> .../adapter/elasticsearch/ElasticsearchRel.java | 66 +-
> .../elasticsearch/ElasticsearchRules.java | 38 +-
> .../elasticsearch/ElasticsearchSchema.java | 30 +-
> .../elasticsearch/ElasticsearchSort.java | 41 +-
> .../elasticsearch/ElasticsearchTable.java | 313 +++++++++-
> .../elasticsearch/ElasticsearchTableScan.java | 6 +-
> .../ElasticsearchToEnumerableConverter.java | 46 +-
> .../elasticsearch/PredicateAnalyzer.java | 10 +
> .../adapter/elasticsearch/QueryBuilders.java | 106 +++-
> .../adapter/elasticsearch/AggregationTest.java | 235 +++++++
> .../adapter/elasticsearch/BooleanLogicTest.java | 1 +
> .../elasticsearch/ElasticSearchAdapterTest.java | 309 +++++++---
> .../elasticsearch/ElasticsearchJsonTest.java | 183 ++++++
> .../EmbeddedElasticsearchPolicy.java | 41 +-
> .../adapter/elasticsearch/Projection2Test.java | 107 ++++
> .../adapter/elasticsearch/ProjectionTest.java | 37 +-
> .../elasticsearch/QueryBuildersTest.java | 63 ++
> .../calcite/test/ElasticsearchChecker.java | 90 ++-
> 27 files changed, 2340 insertions(+), 406 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/pom.xml
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml
> index e3a044d..4700fee 100644
> --- a/elasticsearch/pom.xml
> +++ b/elasticsearch/pom.xml
> @@ -124,6 +124,12 @@ limitations under the License.
> <scope>test</scope>
> </dependency>
> <dependency>
> + <groupId>org.hamcrest</groupId>
> + <artifactId>hamcrest-core</artifactId>
> + <version>${hamcrest.version}</version>
> + <scope>test</scope>
> + </dependency>
> + <dependency>
> <groupId>org.slf4j</groupId>
> <artifactId>slf4j-api</artifactId>
> </dependency>
>
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java
> deleted file mode 100644
> index 1a0f6d0..0000000
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java
> +++ /dev/null
> @@ -1,150 +0,0 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one or more
> - * contributor license agreements. See the NOTICE file distributed with
> - * this work for additional information regarding copyright ownership.
> - * The ASF licenses this file to you under the Apache License, Version 2.0
> - * (the "License"); you may not use this file except in compliance with
> - * the License. You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -package org.apache.calcite.adapter.elasticsearch;
> -
> -import org.apache.calcite.adapter.java.AbstractQueryableTable;
> -import org.apache.calcite.linq4j.Enumerable;
> -import org.apache.calcite.linq4j.Enumerator;
> -import org.apache.calcite.linq4j.QueryProvider;
> -import org.apache.calcite.linq4j.Queryable;
> -import org.apache.calcite.plan.RelOptCluster;
> -import org.apache.calcite.plan.RelOptTable;
> -import org.apache.calcite.rel.RelNode;
> -import org.apache.calcite.rel.type.RelDataType;
> -import org.apache.calcite.rel.type.RelDataTypeFactory;
> -import org.apache.calcite.schema.SchemaPlus;
> -import org.apache.calcite.schema.TranslatableTable;
> -import org.apache.calcite.schema.impl.AbstractTableQueryable;
> -import org.apache.calcite.sql.type.SqlTypeName;
> -
> -import com.fasterxml.jackson.databind.ObjectMapper;
> -
> -import java.util.List;
> -import java.util.Map;
> -import java.util.Objects;
> -
> -/**
> - * Table based on an Elasticsearch type.
> - */
> -abstract class AbstractElasticsearchTable extends AbstractQueryableTable
> - implements TranslatableTable {
> -
> - final String indexName;
> - final String typeName;
> - final ObjectMapper mapper;
> -
> - /**
> - * Creates an ElasticsearchTable.
> - * @param indexName Elastic Search index
> - * @param typeName Elastic Search index type
> - * @param mapper Jackson API to parse (and created) JSON documents
> - */
> - AbstractElasticsearchTable(String indexName, String typeName, ObjectMapper mapper) {
> - super(Object[].class);
> - this.indexName = Objects.requireNonNull(indexName, "indexName");
> - this.typeName = Objects.requireNonNull(typeName, "typeName");
> - this.mapper = Objects.requireNonNull(mapper, "mapper");
> - }
> -
> - @Override public String toString() {
> - return "ElasticsearchTable{" + indexName + "/" + typeName + "}";
> - }
> -
> - public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
> - final RelDataType mapType = relDataTypeFactory.createMapType(
> - relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR),
> - relDataTypeFactory.createTypeWithNullability(
> - relDataTypeFactory.createSqlType(SqlTypeName.ANY),
> - true));
> - return relDataTypeFactory.builder().add("_MAP", mapType).build();
> - }
> -
> - public <T> Queryable<T> asQueryable(QueryProvider queryProvider, SchemaPlus schema,
> - String tableName) {
> - return new ElasticsearchQueryable<>(queryProvider, schema, this, tableName);
> - }
> -
> - public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
> - final RelOptCluster cluster = context.getCluster();
> - return new ElasticsearchTableScan(cluster, cluster.traitSetOf(ElasticsearchRel.CONVENTION),
> - relOptTable, this, null);
> - }
> -
> - /**
> - * In ES 5.x scripted fields start with {@code params._source.foo} while in ES2.x
> - * {@code _source.foo}. Helper method to build correct query based on runtime version of elastic.
> - * Used to keep backwards compatibility with ES2.
> - *
> - * @see <a href="https://github.com/elastic/elasticsearch/issues/20068">_source variable</a>
> - * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-scripting-fields.html">Scripted Fields</a>
> - * @return string to be used for scripted fields
> - */
> - protected abstract String scriptedFieldPrefix();
> -
> - /** Executes a "find" operation on the underlying type.
> - *
> - * <p>For example,
> - * <code>client.prepareSearch(index).setTypes(type)
> - * .setSource("{\"fields\" : [\"state\"]}")</code></p>
> - *
> - * @param index Elasticsearch index
> - * @param ops List of operations represented as Json strings.
> - * @param fields List of fields to project; or null to return map
> - * @return Enumerator of results
> - */
> - protected abstract Enumerable<Object> find(String index, List<String> ops,
> - List<Map.Entry<String, Class>> fields);
> -
> - /**
> - * Implementation of {@link Queryable} based on
> - * a {@link AbstractElasticsearchTable}.
> - *
> - * @param <T> element type
> - */
> - public static class ElasticsearchQueryable<T> extends AbstractTableQueryable<T> {
> - ElasticsearchQueryable(QueryProvider queryProvider, SchemaPlus schema,
> - AbstractElasticsearchTable table, String tableName) {
> - super(queryProvider, schema, table, tableName);
> - }
> -
> - public Enumerator<T> enumerator() {
> - return null;
> - }
> -
> - private String getIndex() {
> - return schema.unwrap(ElasticsearchSchema.class).getIndex();
> - }
> -
> - private AbstractElasticsearchTable getTable() {
> - return (AbstractElasticsearchTable) table;
> - }
> -
> - /** Called via code-generation.
> - * @param ops list of queries (as strings)
> - * @param fields projection
> - * @see ElasticsearchMethod#ELASTICSEARCH_QUERYABLE_FIND
> - * @return result as enumerable
> - */
> - @SuppressWarnings("UnusedDeclaration")
> - public Enumerable<Object> find(List<String> ops,
> - List<Map.Entry<String, Class>> fields) {
> - return getTable().find(getIndex(), ops, fields);
> - }
> - }
> -}
> -
> -// End AbstractElasticsearchTable.java
>
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
> new file mode 100644
> index 0000000..9627aca
> --- /dev/null
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
> @@ -0,0 +1,165 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to you under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.calcite.adapter.elasticsearch;
> +
> +import org.apache.calcite.plan.RelOptCluster;
> +import org.apache.calcite.plan.RelOptCost;
> +import org.apache.calcite.plan.RelOptPlanner;
> +import org.apache.calcite.plan.RelTraitSet;
> +import org.apache.calcite.rel.InvalidRelException;
> +import org.apache.calcite.rel.RelNode;
> +import org.apache.calcite.rel.core.Aggregate;
> +import org.apache.calcite.rel.core.AggregateCall;
> +import org.apache.calcite.rel.metadata.RelMetadataQuery;
> +import org.apache.calcite.rel.type.RelDataType;
> +import org.apache.calcite.rel.type.RelDataTypeField;
> +import org.apache.calcite.sql.SqlKind;
> +import org.apache.calcite.util.ImmutableBitSet;
> +
> +import java.util.ArrayList;
> +import java.util.EnumSet;
> +import java.util.List;
> +import java.util.Locale;
> +import java.util.Set;
> +
> +/**
> + * Implementation of
> + * {@link org.apache.calcite.rel.core.Aggregate} relational expression
> + * for ElasticSearch.
> + */
> +public class ElasticsearchAggregate extends Aggregate implements ElasticsearchRel {
> +
> + private static final Set<SqlKind> SUPPORTED_AGGREGATIONS =
> + EnumSet.of(SqlKind.COUNT, SqlKind.MAX, SqlKind.MIN, SqlKind.AVG, SqlKind.SUM);
> +
> + /** Creates a ElasticsearchAggregate */
> + ElasticsearchAggregate(RelOptCluster cluster,
> + RelTraitSet traitSet,
> + RelNode input,
> + boolean indicator,
> + ImmutableBitSet groupSet,
> + List<ImmutableBitSet> groupSets,
> + List<AggregateCall> aggCalls) throws InvalidRelException {
> + super(cluster, traitSet, input, indicator, groupSet, groupSets, aggCalls);
> +
> + if (getConvention() != input.getConvention()) {
> + String message = String.format(Locale.ROOT, "%s != %s", getConvention(),
> + input.getConvention());
> + throw new AssertionError(message);
> + }
> +
> + assert getConvention() == input.getConvention();
> + assert getConvention() == ElasticsearchRel.CONVENTION;
> + assert this.groupSets.size() == 1 : "Grouping sets not supported";
> +
> + for (AggregateCall aggCall : aggCalls) {
> + if (aggCall.isDistinct()) {
> + throw new InvalidRelException("distinct aggregation not supported");
> + }
> +
> + SqlKind kind = aggCall.getAggregation().getKind();
> + if (!SUPPORTED_AGGREGATIONS.contains(kind)) {
> + final String message = String.format(Locale.ROOT,
> + "Aggregation %s not supported (use one of %s)", kind, SUPPORTED_AGGREGATIONS);
> + throw new InvalidRelException(message);
> + }
> + }
> +
> + if (getGroupType() != Group.SIMPLE) {
> + final String message = String.format(Locale.ROOT, "Only %s grouping is supported. "
> + + "Yours is %s", Group.SIMPLE, getGroupType());
> + throw new InvalidRelException(message);
> + }
> +
> + }
> +
> + @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator,
> + ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets,
> + List<AggregateCall> aggCalls) {
> + try {
> + return new ElasticsearchAggregate(getCluster(), traitSet, input,
> + indicator, groupSet, groupSets,
> + aggCalls);
> + } catch (InvalidRelException e) {
> + throw new AssertionError(e);
> + }
> + }
> +
> + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
> + return super.computeSelfCost(planner, mq).multiplyBy(0.1);
> + }
> +
> + @Override public void implement(Implementor implementor) {
> + implementor.visitChild(0, getInput());
> + List<String> inputFields = fieldNames(getInput().getRowType());
> +
> + for (int group : groupSet) {
> + implementor.addGroupBy(inputFields.get(group));
> + }
> +
> + for (AggregateCall aggCall : aggCalls) {
> + List<String> names = new ArrayList<>();
> + for (int i : aggCall.getArgList()) {
> + names.add(inputFields.get(i));
> + }
> +
> + final String name = names.isEmpty() ? ElasticsearchConstants.ID : names.get(0);
> +
> + String op = String.format(Locale.ROOT, "\"%s\":{\"field\": \"%s\"}",
> + toElasticAggregate(aggCall),
> + name);
> +
> + implementor.addAggregation(aggCall.getName(), op);
> + }
> + }
> +
> + /**
> + * Most of the aggregations can be retrieved with single
> + * <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-stats-aggregation.html">stats</a>
> + * function. But currently only one-to-one mapping is supported between sql agg and elastic
> + * aggregation.
> + */
> + private String toElasticAggregate(AggregateCall call) {
> + SqlKind kind = call.getAggregation().getKind();
> + switch (kind) {
> + case COUNT:
> + return call.isApproximate() ? "cardinality" : "value_count";
> + case SUM:
> + return "sum";
> + case MIN:
> + return "min";
> + case MAX:
> + return "max";
> + case AVG:
> + return "avg";
> + default:
> + throw new IllegalArgumentException("Unknown aggregation kind " + kind + " for " + call);
> + }
> + }
> +
> + private List<String> fieldNames(RelDataType relDataType) {
> + List<String> names = new ArrayList<>();
> +
> + for (RelDataTypeField rdtf : relDataType.getFieldList()) {
> + names.add(rdtf.getName());
> + }
> + return names;
> + }
> +
> +}
> +
> +// End ElasticsearchAggregate.java
>
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
> index ed628cc..2c4c42c 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
> @@ -30,18 +30,9 @@ interface ElasticsearchConstants {
> String FIELDS = "fields";
> String SOURCE_PAINLESS = "params._source";
> String SOURCE_GROOVY = "_source";
> - String SOURCE = SOURCE_GROOVY;
> String ID = "_id";
> String UID = "_uid";
>
> - /* Aggregation pushdown operations supported */
> - String AGG_SUM = "SUM";
> - String AGG_SUM0 = "$SUM0";
> - String AGG_COUNT = "COUNT";
> - String AGG_MIN = "MIN";
> - String AGG_MAX = "MAX";
> - String AGG_AVG = "AVG";
> -
> Set<String> META_COLUMNS = ImmutableSet.of(UID, ID, TYPE, INDEX);
>
> }
>
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
> index d87de7e..16ac92d 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
> @@ -26,27 +26,27 @@ import java.util.Map;
>
> /**
> * Util functions which convert
> - * {@link org.apache.calcite.adapter.elasticsearch.ElasticsearchSearchResult.SearchHit}
> + * {@link ElasticsearchJson.SearchHit}
> * into calcite specific return type (map, object[], list etc.)
> */
> class ElasticsearchEnumerators {
>
> private ElasticsearchEnumerators() {}
>
> - private static Function1<ElasticsearchSearchResult.SearchHit, Map> mapGetter() {
> - return new Function1<ElasticsearchSearchResult.SearchHit, Map>() {
> - public Map apply(ElasticsearchSearchResult.SearchHit hits) {
> + private static Function1<ElasticsearchJson.SearchHit, Map> mapGetter() {
> + return new Function1<ElasticsearchJson.SearchHit, Map>() {
> + public Map apply(ElasticsearchJson.SearchHit hits) {
> return hits.sourceOrFields();
> }
> };
> }
>
> - private static Function1<ElasticsearchSearchResult.SearchHit, Object> singletonGetter(
> + private static Function1<ElasticsearchJson.SearchHit, Object> singletonGetter(
> final String fieldName,
> final Class fieldClass) {
> - return new Function1<ElasticsearchSearchResult.SearchHit, Object>() {
> - public Object apply(ElasticsearchSearchResult.SearchHit hits) {
> - return convert(hits.sourceOrFields(), fieldClass);
> + return new Function1<ElasticsearchJson.SearchHit, Object>() {
> + public Object apply(ElasticsearchJson.SearchHit hits) {
> + return convert(hits.valueOrNull(fieldName), fieldClass);
> }
> };
> }
> @@ -59,30 +59,38 @@ class ElasticsearchEnumerators {
> *
> * @return function that converts the search result into a generic array
> */
> - private static Function1<ElasticsearchSearchResult.SearchHit, Object[]> listGetter(
> + private static Function1<ElasticsearchJson.SearchHit, Object[]> listGetter(
> final List<Map.Entry<String, Class>> fields) {
> - return new Function1<ElasticsearchSearchResult.SearchHit, Object[]>() {
> - public Object[] apply(ElasticsearchSearchResult.SearchHit hit) {
> + return new Function1<ElasticsearchJson.SearchHit, Object[]>() {
> + public Object[] apply(ElasticsearchJson.SearchHit hit) {
> Object[] objects = new Object[fields.size()];
> for (int i = 0; i < fields.size(); i++) {
> final Map.Entry<String, Class> field = fields.get(i);
> final String name = field.getKey();
> final Class type = field.getValue();
> - objects[i] = convert(hit.value(name), type);
> + objects[i] = convert(hit.valueOrNull(name), type);
> }
> return objects;
> }
> };
> }
>
> - static Function1<ElasticsearchSearchResult.SearchHit, Object> getter(
> + static Function1<ElasticsearchJson.SearchHit, Object> getter(
> List<Map.Entry<String, Class>> fields) {
> //noinspection unchecked
> - return fields == null
> - ? (Function1) mapGetter()
> - : fields.size() == 1
> - ? singletonGetter(fields.get(0).getKey(), fields.get(0).getValue())
> - : (Function1) listGetter(fields);
> + final Function1 getter;
> + if (fields == null || fields.size() == 1 && "_MAP".equals(fields.get(0).getKey())) {
> + // select * from table
> + getter = mapGetter();
> + } else if (fields.size() == 1) {
> + // select foo from table
> + getter = singletonGetter(fields.get(0).getKey(), fields.get(0).getValue());
> + } else {
> + // select a, b, c from table
> + getter = listGetter(fields);
> + }
> +
> + return getter;
> }
>
> private static Object convert(Object o, Class clazz) {
>
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
> index 4d187b1..c339671 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
> @@ -23,7 +23,6 @@ import org.apache.calcite.plan.RelOptUtil;
> import org.apache.calcite.plan.RelTraitSet;
> import org.apache.calcite.rel.RelNode;
> import org.apache.calcite.rel.core.Filter;
> -import org.apache.calcite.rel.core.Project;
> import org.apache.calcite.rel.metadata.RelMetadataQuery;
> import org.apache.calcite.rex.RexCall;
> import org.apache.calcite.rex.RexInputRef;
> @@ -70,24 +69,13 @@ public class ElasticsearchFilter extends Filter implements ElasticsearchRel {
>
> @Override public void implement(Implementor implementor) {
> implementor.visitChild(0, getInput());
> - List<String> fieldNames;
> - if (input instanceof Project) {
> - final List<RexNode> projects = ((Project) input).getProjects();
> - fieldNames = new ArrayList<>(projects.size());
> - for (RexNode project : projects) {
> - String name = project.accept(MapProjectionFieldVisitor.INSTANCE);
> - fieldNames.add(name);
> - }
> - } else {
> - fieldNames = ElasticsearchRules.elasticsearchFieldNames(getRowType());
> - }
> ObjectMapper mapper = implementor.elasticsearchTable.mapper;
> PredicateAnalyzerTranslator translator = new PredicateAnalyzerTranslator(mapper);
> try {
> implementor.add(translator.translateMatch(condition));
> } catch (IOException e) {
> throw new UncheckedIOException(e);
> - } catch (ExpressionNotAnalyzableException e) {
> + } catch (PredicateAnalyzer.ExpressionNotAnalyzableException e) {
> throw new RuntimeException(e);
> }
> }
> @@ -103,7 +91,8 @@ public class ElasticsearchFilter extends Filter implements ElasticsearchRel {
> this.mapper = Objects.requireNonNull(mapper, "mapper");
> }
>
> - String translateMatch(RexNode condition) throws IOException, ExpressionNotAnalyzableException {
> + String translateMatch(RexNode condition) throws IOException,
> + PredicateAnalyzer.ExpressionNotAnalyzableException {
>
> StringWriter writer = new StringWriter();
> JsonGenerator generator = mapper.getFactory().createGenerator(writer);
>
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
> new file mode 100644
> index 0000000..7c80e82
> --- /dev/null
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
> @@ -0,0 +1,614 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to you under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.calcite.adapter.elasticsearch;
> +
> +import com.fasterxml.jackson.annotation.JsonCreator;
> +import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
> +import com.fasterxml.jackson.annotation.JsonProperty;
> +import com.fasterxml.jackson.core.JsonParser;
> +import com.fasterxml.jackson.core.JsonProcessingException;
> +import com.fasterxml.jackson.databind.DeserializationContext;
> +import com.fasterxml.jackson.databind.JsonNode;
> +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
> +import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
> +import com.fasterxml.jackson.databind.node.ArrayNode;
> +import com.fasterxml.jackson.databind.node.JsonNodeFactory;
> +import com.fasterxml.jackson.databind.node.ObjectNode;
> +
> +import java.io.IOException;
> +import java.time.Duration;
> +import java.util.ArrayList;
> +import java.util.Arrays;
> +import java.util.Collections;
> +import java.util.HashSet;
> +import java.util.Iterator;
> +import java.util.LinkedHashMap;
> +import java.util.List;
> +import java.util.Locale;
> +import java.util.Map;
> +import java.util.Objects;
> +import java.util.Set;
> +import java.util.function.BiConsumer;
> +import java.util.function.Consumer;
> +import java.util.stream.StreamSupport;
> +
> +import static java.util.Collections.unmodifiableMap;
> +
> +/**
> + * Internal objects (and deserializers) used to parse elastic search results
> + * (which are in JSON format).
> + *
> + * <p>Since we're using basic row-level rest client http response has to be
> + * processed manually using JSON (jackson) library.
> + */
> +class ElasticsearchJson {
> +
> + /**
> + * Used as special aggregation key for missing values (documents which are missing a field).
> + * Buckets with that value are then converted to {@code null}s in flat tabular format.
> + * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-sum-aggregation.html">Missing Value</a>
> + */
> + static final JsonNode MISSING_VALUE = JsonNodeFactory.instance.textNode("__MISSING__");
> +
> + private ElasticsearchJson() {}
> +
> + /**
> + * Visits leaves of the aggregation where all values are stored.
> + */
> + static void visitValueNodes(Aggregations aggregations, Consumer<Map<String, Object>> consumer) {
> + Objects.requireNonNull(aggregations, "aggregations");
> + Objects.requireNonNull(consumer, "consumer");
> +
> + List<Bucket> buckets = new ArrayList<>();
> +
> + Map<RowKey, List<MultiValue>> rows = new LinkedHashMap<>();
> +
> + BiConsumer<RowKey, MultiValue> cons = (r, v) ->
> + rows.computeIfAbsent(r, ignore -> new ArrayList<>()).add(v);
> + aggregations.forEach(a -> visitValueNodes(a, buckets, cons));
> + rows.forEach((k, v) -> {
> + Map<String, Object> row = new LinkedHashMap<>(k.keys);
> + v.forEach(val -> row.put(val.getName(), val.value()));
> + consumer.accept(row);
> + });
> + }
> +
> + /**
> + * Identifies a calcite row (as in relational algebra)
> + */
> + private static class RowKey {
> + private final Map<String, Object> keys;
> + private final int hashCode;
> +
> + private RowKey(final Map<String, Object> keys) {
> + this.keys = Objects.requireNonNull(keys, "keys");
> + this.hashCode = Objects.hashCode(keys);
> + }
> +
> + private RowKey(List<Bucket> buckets) {
> + this(toMap(buckets));
> + }
> +
> + private static Map<String, Object> toMap(Iterable<Bucket> buckets) {
> + return StreamSupport.stream(buckets.spliterator(), false)
> + .collect(LinkedHashMap::new,
> + (m, v) -> m.put(v.getName(), v.key()),
> + LinkedHashMap::putAll);
> + }
> +
> + @Override public boolean equals(final Object o) {
> + if (this == o) {
> + return true;
> + }
> + if (o == null || getClass() != o.getClass()) {
> + return false;
> + }
> + final RowKey rowKey = (RowKey) o;
> + return hashCode == rowKey.hashCode
> + && Objects.equals(keys, rowKey.keys);
> + }
> +
> + @Override public int hashCode() {
> + return this.hashCode;
> + }
> + }
> +
> + private static void visitValueNodes(Aggregation aggregation, List<Bucket> parents,
> + BiConsumer<RowKey, MultiValue> consumer) {
> +
> + if (aggregation instanceof MultiValue) {
> + // publish one value of the row
> + RowKey key = new RowKey(parents);
> + consumer.accept(key, (MultiValue) aggregation);
> + return;
> + }
> +
> + if (aggregation instanceof Bucket) {
> + Bucket bucket = (Bucket) aggregation;
> + parents.add(bucket);
> + bucket.getAggregations().forEach(a -> visitValueNodes(a, parents, consumer));
> + parents.remove(parents.size() - 1);
> + } else if (aggregation instanceof HasAggregations) {
> + HasAggregations children = (HasAggregations) aggregation;
> + children.getAggregations().forEach(a -> visitValueNodes(a, parents, consumer));
> + } else if (aggregation instanceof MultiBucketsAggregation) {
> + MultiBucketsAggregation multi = (MultiBucketsAggregation) aggregation;
> + multi.buckets().forEach(b -> {
> + parents.add(b);
> + b.getAggregations().forEach(a -> visitValueNodes(a, parents, consumer));
> + parents.remove(parents.size() - 1);
> + });
> + }
> +
> + }
> +
> + /**
> + * Response from Elastic
> + */
> + @JsonIgnoreProperties(ignoreUnknown = true)
> + static class Result {
> + private final SearchHits hits;
> + private final Aggregations aggregations;
> + private final long took;
> +
> + /**
> + * Constructor for this instance.
> + * @param hits list of matched documents
> + * @param took time taken (in took) for this query to execute
> + */
> + @JsonCreator
> + Result(@JsonProperty("hits") SearchHits hits,
> + @JsonProperty("aggregations") Aggregations aggregations,
> + @JsonProperty("took") long took) {
> + this.hits = Objects.requireNonNull(hits, "hits");
> + this.aggregations = aggregations;
> + this.took = took;
> + }
> +
> + SearchHits searchHits() {
> + return hits;
> + }
> +
> + Aggregations aggregations() {
> + return aggregations;
> + }
> +
> + public Duration took() {
> + return Duration.ofMillis(took);
> + }
> +
> + }
> +
> + /**
> + * Similar to {@code SearchHits} in ES. Container for {@link SearchHit}
> + */
> + @JsonIgnoreProperties(ignoreUnknown = true)
> + static class SearchHits {
> +
> + private final long total;
> + private final List<SearchHit> hits;
> +
> + @JsonCreator
> + SearchHits(@JsonProperty("total")final long total,
> + @JsonProperty("hits") final List<SearchHit> hits) {
> + this.total = total;
> + this.hits = Objects.requireNonNull(hits, "hits");
> + }
> +
> + public List<SearchHit> hits() {
> + return this.hits;
> + }
> +
> + public long total() {
> + return total;
> + }
> +
> + }
> +
> + /**
> + * Concrete result record which matched the query. Similar to {@code SearchHit} in ES.
> + */
> + @JsonIgnoreProperties(ignoreUnknown = true)
> + static class SearchHit {
> + private final String id;
> + private final Map<String, Object> source;
> + private final Map<String, Object> fields;
> +
> + @JsonCreator
> + SearchHit(@JsonProperty("_id") final String id,
> + @JsonProperty("_source") final Map<String, Object> source,
> + @JsonProperty("fields") final Map<String, Object> fields) {
> + this.id = Objects.requireNonNull(id, "id");
> +
> + // both can't be null
> + if (source == null && fields == null) {
> + final String message = String.format(Locale.ROOT,
> + "Both '_source' and 'fields' are missing for %s", id);
> + throw new IllegalArgumentException(message);
> + }
> +
> + // both can't be non-null
> + if (source != null && fields != null) {
> + final String message = String.format(Locale.ROOT,
> + "Both '_source' and 'fields' are populated (non-null) for %s", id);
> + throw new IllegalArgumentException(message);
> + }
> +
> + this.source = source;
> + this.fields = fields;
> + }
> +
> + /**
> + * Returns id of this hit (usually document id)
> + * @return unique id
> + */
> + public String id() {
> + return id;
> + }
> +
> + Object valueOrNull(String name) {
> + Objects.requireNonNull(name, "name");
> + if (fields != null && fields.containsKey(name)) {
> + Object field = fields.get(name);
> + if (field instanceof Iterable) {
> + // return first element (or null)
> + Iterator<?> iter = ((Iterable<?>) field).iterator();
> + return iter.hasNext() ? iter.next() : null;
> + }
> +
> + return field;
> + }
> +
> + return valueFromPath(source, name);
> + }
> +
> + /**
> + * Returns property from nested maps given a path like {@code a.b.c}.
> + * @param map current map
> + * @param path field path(s), optionally with dots ({@code a.b.c}).
> + * @return value located at path {@code path} or {@code null} if not found.
> + */
> + private static Object valueFromPath(Map<String, Object> map, String path) {
> + if (map == null) {
> + return null;
> + }
> +
> + if (map.containsKey(path)) {
> + return map.get(path);
> + }
> +
> + // maybe pattern of type a.b.c
> + final int index = path.indexOf('.');
> + if (index == -1) {
> + return null;
> + }
> +
> + final String prefix = path.substring(0, index);
> + final String suffix = path.substring(index + 1);
> +
> + Object maybeMap = map.get(prefix);
> + if (maybeMap instanceof Map) {
> + return valueFromPath((Map<String, Object>) maybeMap, suffix);
> + }
> +
> + return null;
> + }
> +
> + Map<String, Object> source() {
> + return source;
> + }
> +
> + Map<String, Object> fields() {
> + return fields;
> + }
> +
> + Map<String, Object> sourceOrFields() {
> + return source != null ? source : fields;
> + }
> + }
> +
> +
> + /**
> + * {@link Aggregation} container.
> + */
> + @JsonDeserialize(using = AggregationsDeserializer.class)
> + static class Aggregations implements Iterable<Aggregation> {
> +
> + private final List<? extends Aggregation> aggregations;
> + private Map<String, Aggregation> aggregationsAsMap;
> +
> + Aggregations(List<? extends Aggregation> aggregations) {
> + this.aggregations = Objects.requireNonNull(aggregations, "aggregations");
> + }
> +
> + /**
> + * Iterates over the {@link Aggregation}s.
> + */
> + @Override public final Iterator<Aggregation> iterator() {
> + return asList().iterator();
> + }
> +
> + /**
> + * The list of {@link Aggregation}s.
> + */
> + final List<Aggregation> asList() {
> + return Collections.unmodifiableList(aggregations);
> + }
> +
> + /**
> + * Returns the {@link Aggregation}s keyed by aggregation name. Lazy init.
> + */
> + final Map<String, Aggregation> asMap() {
> + if (aggregationsAsMap == null) {
> + Map<String, Aggregation> map = new LinkedHashMap<>(aggregations.size());
> + for (Aggregation aggregation : aggregations) {
> + map.put(aggregation.getName(), aggregation);
> + }
> + this.aggregationsAsMap = unmodifiableMap(map);
> + }
> + return aggregationsAsMap;
> + }
> +
> + /**
> + * Returns the aggregation that is associated with the specified name.
> + */
> + @SuppressWarnings("unchecked")
> + public final <A extends Aggregation> A get(String name) {
> + return (A) asMap().get(name);
> + }
> +
> + @Override public final boolean equals(Object obj) {
> + if (obj == null || getClass() != obj.getClass()) {
> + return false;
> + }
> + return aggregations.equals(((Aggregations) obj).aggregations);
> + }
> +
> + @Override public final int hashCode() {
> + return Objects.hash(getClass(), aggregations);
> + }
> +
> + }
> +
> + /**
> + * Identifies all aggregations
> + */
> + interface Aggregation {
> +
> + /**
> + * @return The name of this aggregation.
> + */
> + String getName();
> +
> + }
> +
> + /**
> + * Allows traversing aggregations tree
> + */
> + interface HasAggregations {
> + Aggregations getAggregations();
> + }
> +
> + /**
> + * An aggregation that returns multiple buckets
> + */
> + static class MultiBucketsAggregation implements Aggregation {
> +
> + private final String name;
> + private final List<Bucket> buckets;
> +
> + MultiBucketsAggregation(final String name,
> + final List<Bucket> buckets) {
> + this.name = name;
> + this.buckets = buckets;
> + }
> +
> + /**
> + * @return The buckets of this aggregation.
> + */
> + List<Bucket> buckets() {
> + return buckets;
> + }
> +
> + @Override public String getName() {
> + return name;
> + }
> + }
> +
> + /**
> + * A bucket represents a criteria to which all documents that fall in it adhere to.
> + * It is also uniquely identified
> + * by a key, and can potentially hold sub-aggregations computed over all documents in it.
> + */
> + static class Bucket implements HasAggregations, Aggregation {
> + private final Object key;
> + private final String name;
> + private final Aggregations aggregations;
> +
> + Bucket(final Object key,
> + final String name,
> + final Aggregations aggregations) {
> + this.key = key; // key can be set after construction
> + this.name = Objects.requireNonNull(name, "name");
> + this.aggregations = Objects.requireNonNull(aggregations, "aggregations");
> + }
> +
> + /**
> + * @return The key associated with the bucket
> + */
> + Object key() {
> + return key;
> + }
> +
> + /**
> + * @return The key associated with the bucket as a string
> + */
> + String keyAsString() {
> + return Objects.toString(key());
> + }
> +
> + /**
> + * @return The sub-aggregations of this bucket
> + */
> + @Override public Aggregations getAggregations() {
> + return aggregations;
> + }
> +
> + @Override public String getName() {
> + return name;
> + }
> + }
> +
> + /**
> + * Multi value aggregatoin like
> + * <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-stats-aggregation.html">Stats</a>
> + */
> + static class MultiValue implements Aggregation {
> + private final String name;
> + private final Map<String, Object> values;
> +
> + MultiValue(final String name, final Map<String, Object> values) {
> + this.name = Objects.requireNonNull(name, "name");
> + this.values = Objects.requireNonNull(values, "values");
> + }
> +
> + @Override public String getName() {
> + return name;
> + }
> +
> + Map<String, Object> values() {
> + return values;
> + }
> +
> + /**
> + * For single value. Returns single value represented by this leaf aggregation.
> + * @return value corresponding to {@code value}
> + */
> + Object value() {
> + if (!values().containsKey("value")) {
> + throw new IllegalStateException("'value' field not present in this aggregation");
> + }
> +
> + return values().get("value");
> + }
> +
> + }
> +
> + /**
> + * Allows to de-serialize nested aggregation structures.
> + */
> + static class AggregationsDeserializer extends StdDeserializer<Aggregations> {
> +
> + private static final Set<String> IGNORE_TOKENS = new HashSet<>(Arrays.asList("meta",
> + "buckets", "value", "values", "value_as_string", "doc_count", "key", "key_as_string"));
> +
> + AggregationsDeserializer() {
> + super(Aggregations.class);
> + }
> +
> + @Override public Aggregations deserialize(final JsonParser parser,
> + final DeserializationContext ctxt)
> + throws IOException {
> +
> + ObjectNode node = parser.getCodec().readTree(parser);
> + return parseAggregations(parser, node);
> + }
> +
> + private static Aggregations parseAggregations(JsonParser parser, ObjectNode node)
> + throws JsonProcessingException {
> +
> + List<Aggregation> aggregations = new ArrayList<>();
> +
> + Iterable<Map.Entry<String, JsonNode>> iter = node::fields;
> + for (Map.Entry<String, JsonNode> entry : iter) {
> + final String name = entry.getKey();
> + final JsonNode value = entry.getValue();
> +
> + Aggregation agg = null;
> + if (value.has("buckets")) {
> + agg = parseBuckets(parser, name, (ArrayNode) value.get("buckets"));
> + } else if (value.isObject() && !IGNORE_TOKENS.contains(name)) {
> + // leaf
> + agg = parseValue(parser, name, (ObjectNode) value);
> + }
> +
> + if (agg != null) {
> + aggregations.add(agg);
> + }
> + }
> +
> + return new Aggregations(aggregations);
> + }
> +
> +
> +
> + private static MultiValue parseValue(JsonParser parser, String name, ObjectNode node)
> + throws JsonProcessingException {
> +
> + return new MultiValue(name, parser.getCodec().treeToValue(node, Map.class));
> + }
> +
> + private static Aggregation parseBuckets(JsonParser parser, String name, ArrayNode nodes)
> + throws JsonProcessingException {
> +
> + List<Bucket> buckets = new ArrayList<>(nodes.size());
> + for (JsonNode b: nodes) {
> + buckets.add(parseBucket(parser, name, (ObjectNode) b));
> + }
> +
> + return new MultiBucketsAggregation(name, buckets);
> + }
> +
> + /**
> + * Determines if current key is a missing field key. Missing key is returned when document
> + * does not have pivoting attribute (example {@code GROUP BY _MAP['a.b.missing']}). It helps
> + * grouping documents which don't have a field. In relational algebra this
> + * would be {@code null}.
> + *
> + * @param key current {@code key} (usually string) as returned by ES
> + * @return {@code true} if this value
> + * @see #MISSING_VALUE
> + */
> + private static boolean isMissingBucket(JsonNode key) {
> + return MISSING_VALUE.equals(key);
> + }
> +
> + private static Bucket parseBucket(JsonParser parser, String name, ObjectNode node)
> + throws JsonProcessingException {
> +
> + final JsonNode keyNode = node.get("key");
> + final Object key;
> + if (isMissingBucket(keyNode) || keyNode.isNull()) {
> + key = null;
> + } else if (keyNode.isTextual()) {
> + key = keyNode.textValue();
> + } else if (keyNode.isNumber()) {
> + key = keyNode.numberValue();
> + } else if (keyNode.isBoolean()) {
> + key = keyNode.booleanValue();
> + } else {
> + // don't usually expect keys to be Objects
> + key = parser.getCodec().treeToValue(node, Map.class);
> + }
> +
> + return new Bucket(key, name, parseAggregations(parser, node));
> + }
> +
> + }
> +}
> +
> +// End ElasticsearchJson.java
>
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
> index 72753e6..709156f 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
> @@ -27,8 +27,17 @@ import java.util.List;
> * Builtin methods in the Elasticsearch adapter.
> */
> enum ElasticsearchMethod {
> - ELASTICSEARCH_QUERYABLE_FIND(AbstractElasticsearchTable.ElasticsearchQueryable.class,
> - "find", List.class, List.class);
> +
> + ELASTICSEARCH_QUERYABLE_FIND(ElasticsearchTable.ElasticsearchQueryable.class,
> + "find",
> + List.class, // ops - projections and other stuff
> + List.class, // fields
> + List.class, // sort
> + List.class, // groupBy
> + List.class, // aggregations
> + Long.class, // offset
> + Long.class // fetch
> + );
>
> public final Method method;
>
>
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
> index 7d5811c..d0841c3 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
> @@ -101,11 +101,7 @@ public class ElasticsearchProject extends Project implements ElasticsearchRel {
> query.append("\"script_fields\": {" + String.join(", ", scriptFields) + "}");
> }
>
> - for (String opfield : implementor.list) {
> - if (opfield.startsWith("\"_source\"")) {
> - implementor.list.remove(opfield);
> - }
> - }
> + implementor.list.removeIf(l -> l.startsWith("\"_source\""));
> implementor.add(query.toString());
> }
> }
>
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
> index 436adf9..1dad691 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
> @@ -18,10 +18,14 @@ package org.apache.calcite.adapter.elasticsearch;
>
> import org.apache.calcite.plan.Convention;
> import org.apache.calcite.plan.RelOptTable;
> +import org.apache.calcite.rel.RelFieldCollation;
> import org.apache.calcite.rel.RelNode;
> +import org.apache.calcite.util.Pair;
>
> import java.util.ArrayList;
> import java.util.List;
> +import java.util.Map;
> +import java.util.Objects;
>
> /**
> * Relational expression that uses Elasticsearch calling convention.
> @@ -39,19 +43,75 @@ public interface ElasticsearchRel extends RelNode {
> * {@link ElasticsearchRel} nodes into an Elasticsearch query.
> */
> class Implementor {
> +
> final List<String> list = new ArrayList<>();
>
> + /**
> + * Sorting clauses.
> + * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-sort.html">Sort</a>
> + */
> + final List<Map.Entry<String, RelFieldCollation.Direction>> sort = new ArrayList<>();
> +
> + /**
> + * Elastic aggregation ({@code MIN / MAX / COUNT} etc.) statements (functions).
> + * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html">aggregations</a>
> + */
> + final List<Map.Entry<String, String>> aggregations = new ArrayList<>();
> +
> + /**
> + * Allows bucketing documents together. Similar to {@code select ... from table group by field1}
> + * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/6.3/search-aggregations-bucket.html">Bucket Aggregrations</a>
> + */
> + final List<String> groupBy = new ArrayList<>();
> +
> + /**
> + * Starting index (default {@code 0}). Equivalent to {@code start} in ES query.
> + * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html">From/Size</a>
> + */
> + Long offset;
> +
> + /**
> + * Number of records to return. Equivalent to {@code size} in ES query.
> + * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html">From/Size</a>
> + */
> + Long fetch;
> +
> RelOptTable table;
> - AbstractElasticsearchTable elasticsearchTable;
> + ElasticsearchTable elasticsearchTable;
>
> - public void add(String findOp) {
> + void add(String findOp) {
> list.add(findOp);
> }
>
> - public void visitChild(int ordinal, RelNode input) {
> + void addGroupBy(String field) {
> + Objects.requireNonNull(field, "field");
> + groupBy.add(field);
> + }
> +
> + void addSort(String field, RelFieldCollation.Direction direction) {
> + Objects.requireNonNull(field, "field");
> + sort.add(new Pair<>(field, direction));
> + }
> +
> + void addAggregation(String field, String expression) {
> + Objects.requireNonNull(field, "field");
> + Objects.requireNonNull(expression, "expression");
> + aggregations.add(new Pair<>(field, expression));
> + }
> +
> + void offset(long offset) {
> + this.offset = offset;
> + }
> +
> + void fetch(long fetch) {
> + this.fetch = fetch;
> + }
> +
> + void visitChild(int ordinal, RelNode input) {
> assert ordinal == 0;
> ((ElasticsearchRel) input).implement(this);
> }
> +
> }
> }
>
>
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
> index 97e934c..b442ddd 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
> @@ -23,10 +23,12 @@ import org.apache.calcite.plan.Convention;
> import org.apache.calcite.plan.RelOptRule;
> import org.apache.calcite.plan.RelTrait;
> import org.apache.calcite.plan.RelTraitSet;
> +import org.apache.calcite.rel.InvalidRelException;
> import org.apache.calcite.rel.RelCollations;
> import org.apache.calcite.rel.RelNode;
> import org.apache.calcite.rel.convert.ConverterRule;
> import org.apache.calcite.rel.core.Sort;
> +import org.apache.calcite.rel.logical.LogicalAggregate;
> import org.apache.calcite.rel.logical.LogicalFilter;
> import org.apache.calcite.rel.logical.LogicalProject;
> import org.apache.calcite.rel.type.RelDataType;
> @@ -53,7 +55,8 @@ class ElasticsearchRules {
> static final RelOptRule[] RULES = {
> ElasticsearchSortRule.INSTANCE,
> ElasticsearchFilterRule.INSTANCE,
> - ElasticsearchProjectRule.INSTANCE
> + ElasticsearchProjectRule.INSTANCE,
> + ElasticsearchAggregateRule.INSTANCE
> };
>
> private ElasticsearchRules() {}
> @@ -147,7 +150,7 @@ class ElasticsearchRules {
> }
> }
> throw new IllegalArgumentException("Translation of " + call.toString()
> - + "is not supported by ElasticsearchProject");
> + + " is not supported by ElasticsearchProject");
> }
>
> List<String> visitList(List<RexNode> list) {
> @@ -217,6 +220,37 @@ class ElasticsearchRules {
> }
>
> /**
> + * Rule to convert an {@link org.apache.calcite.rel.logical.LogicalAggregate}
> + * to an {@link ElasticsearchAggregate}.
> + */
> + private static class ElasticsearchAggregateRule extends ElasticsearchConverterRule {
> + static final RelOptRule INSTANCE = new ElasticsearchAggregateRule();
> +
> + private ElasticsearchAggregateRule() {
> + super(LogicalAggregate.class, Convention.NONE, ElasticsearchRel.CONVENTION,
> + "ElasticsearchAggregateRule");
> + }
> +
> + public RelNode convert(RelNode rel) {
> + final LogicalAggregate agg = (LogicalAggregate) rel;
> + final RelTraitSet traitSet = agg.getTraitSet().replace(out);
> + try {
> + return new ElasticsearchAggregate(
> + rel.getCluster(),
> + traitSet,
> + convert(agg.getInput(), traitSet.simplify()),
> + agg.indicator,
> + agg.getGroupSet(),
> + agg.getGroupSets(),
> + agg.getAggCallList());
> + } catch (InvalidRelException e) {
> + return null;
> + }
> + }
> + }
> +
> +
> + /**
> * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalProject}
> * to an {@link ElasticsearchProject}.
> */
>
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
> index 1c630ad..80a94be 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
> @@ -30,6 +30,7 @@ import org.elasticsearch.client.RestClient;
> import java.io.IOException;
> import java.io.InputStream;
> import java.io.UncheckedIOException;
> +import java.util.Collections;
> import java.util.Locale;
> import java.util.Map;
> import java.util.Objects;
> @@ -48,6 +49,8 @@ public class ElasticsearchSchema extends AbstractSchema {
>
> private final ObjectMapper mapper;
>
> + private final Map<String, Table> tableMap;
> +
> /**
> * Allows schema to be instantiated from existing elastic search client.
> * This constructor is used in tests.
> @@ -56,20 +59,33 @@ public class ElasticsearchSchema extends AbstractSchema {
> * @param index name of ES index
> */
> public ElasticsearchSchema(RestClient client, ObjectMapper mapper, String index) {
> + this(client, mapper, index, null);
> + }
> +
> + public ElasticsearchSchema(RestClient client, ObjectMapper mapper, String index, String type) {
> super();
> this.client = Objects.requireNonNull(client, "client");
> this.mapper = Objects.requireNonNull(mapper, "mapper");
> this.index = Objects.requireNonNull(index, "index");
> + if (type == null) {
> + try {
> + this.tableMap = createTables(listTypesFromElastic());
> + } catch (IOException e) {
> + throw new UncheckedIOException("Couldn't get types for " + index, e);
> + }
> + } else {
> + this.tableMap = createTables(Collections.singleton(type));
> + }
> }
>
> @Override protected Map<String, Table> getTableMap() {
> + return tableMap;
> + }
> +
> + private Map<String, Table> createTables(Iterable<String> types) {
> final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
> - try {
> - for (String type: listTypes()) {
> - builder.put(type, new ElasticsearchTable(client, mapper, index, type));
> - }
> - } catch (IOException e) {
> - throw new UncheckedIOException("Failed to get types for " + index, e);
> + for (String type : types) {
> + builder.put(type, new ElasticsearchTable(client, mapper, index, type));
> }
> return builder.build();
> }
> @@ -81,7 +97,7 @@ public class ElasticsearchSchema extends AbstractSchema {
> * @throws IOException for any IO related issues
> * @throws IllegalStateException if reply is not understood
> */
> - private Set<String> listTypes() throws IOException {
> + private Set<String> listTypesFromElastic() throws IOException {
> final String endpoint = "/" + index + "/_mapping";
> final Response response = client.performRequest("GET", endpoint);
> try (InputStream is = response.getEntity().getContent()) {
>
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
> index ed669aa..9078b72 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
> @@ -23,15 +23,12 @@ import org.apache.calcite.plan.RelTraitSet;
> import org.apache.calcite.rel.RelCollation;
> import org.apache.calcite.rel.RelFieldCollation;
> import org.apache.calcite.rel.RelNode;
> -import org.apache.calcite.rel.core.Project;
> import org.apache.calcite.rel.core.Sort;
> import org.apache.calcite.rel.metadata.RelMetadataQuery;
> import org.apache.calcite.rel.type.RelDataTypeField;
> import org.apache.calcite.rex.RexLiteral;
> import org.apache.calcite.rex.RexNode;
> -import org.apache.calcite.util.Util;
>
> -import java.util.ArrayList;
> import java.util.List;
>
> /**
> @@ -57,48 +54,22 @@ public class ElasticsearchSort extends Sort implements ElasticsearchRel {
>
> @Override public void implement(Implementor implementor) {
> implementor.visitChild(0, getInput());
> - if (!collation.getFieldCollations().isEmpty()) {
> - final List<String> keys = new ArrayList<>();
> - if (input instanceof Project) {
> - final List<RexNode> projects = ((Project) input).getProjects();
> + final List<RelDataTypeField> fields = getRowType().getFieldList();
>
> - for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
> - RexNode project = projects.get(fieldCollation.getFieldIndex());
> - String name = project.accept(MapProjectionFieldVisitor.INSTANCE);
> - keys.add(ElasticsearchRules.quote(name) + ": " + direction(fieldCollation));
> - }
> - } else {
> - final List<RelDataTypeField> fields = getRowType().getFieldList();
> -
> - for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
> - final String name = fields.get(fieldCollation.getFieldIndex()).getName();
> - keys.add(ElasticsearchRules.quote(name) + ": " + direction(fieldCollation));
> - }
> - }
> -
> - implementor.add("\"sort\": [ " + Util.toString(keys, "{", "}, {", "}") + "]");
> + for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
> + final String name = fields.get(fieldCollation.getFieldIndex()).getName();
> + implementor.addSort(name, fieldCollation.getDirection());
> }
>
> if (offset != null) {
> - implementor.add("\"from\": " + ((RexLiteral) offset).getValue());
> + implementor.offset(((RexLiteral) offset).getValueAs(Long.class));
> }
>
> if (fetch != null) {
> - implementor.add("\"size\": " + ((RexLiteral) fetch).getValue());
> + implementor.fetch(((RexLiteral) fetch).getValueAs(Long.class));
> }
> }
>
> - private String direction(RelFieldCollation fieldCollation) {
> - switch (fieldCollation.getDirection()) {
> - case DESCENDING:
> - case STRICTLY_DESCENDING:
> - return "\"desc\"";
> - case ASCENDING:
> - case STRICTLY_ASCENDING:
> - default:
> - return "\"asc\"";
> - }
> - }
> }
>
> // End ElasticsearchSort.java
>
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
> index 955636e..c404da7 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
> @@ -16,9 +16,24 @@
> */
> package org.apache.calcite.adapter.elasticsearch;
>
> +import org.apache.calcite.adapter.java.AbstractQueryableTable;
> import org.apache.calcite.linq4j.Enumerable;
> +import org.apache.calcite.linq4j.Enumerator;
> import org.apache.calcite.linq4j.Linq4j;
> +import org.apache.calcite.linq4j.QueryProvider;
> +import org.apache.calcite.linq4j.Queryable;
> import org.apache.calcite.linq4j.function.Function1;
> +import org.apache.calcite.plan.RelOptCluster;
> +import org.apache.calcite.plan.RelOptTable;
> +import org.apache.calcite.rel.RelFieldCollation;
> +import org.apache.calcite.rel.RelNode;
> +import org.apache.calcite.rel.type.RelDataType;
> +import org.apache.calcite.rel.type.RelDataTypeFactory;
> +import org.apache.calcite.runtime.Hook;
> +import org.apache.calcite.schema.SchemaPlus;
> +import org.apache.calcite.schema.TranslatableTable;
> +import org.apache.calcite.schema.impl.AbstractTableQueryable;
> +import org.apache.calcite.sql.type.SqlTypeName;
> import org.apache.calcite.util.Util;
>
> import org.apache.http.HttpEntity;
> @@ -29,26 +44,47 @@ import org.apache.http.util.EntityUtils;
>
> import com.fasterxml.jackson.databind.JsonNode;
> import com.fasterxml.jackson.databind.ObjectMapper;
> +import com.fasterxml.jackson.databind.node.ArrayNode;
> +import com.fasterxml.jackson.databind.node.ObjectNode;
>
> import org.elasticsearch.client.Response;
> import org.elasticsearch.client.RestClient;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
>
> import java.io.IOException;
> import java.io.InputStream;
> import java.io.UncheckedIOException;
> +import java.util.ArrayList;
> import java.util.Collections;
> +import java.util.HashMap;
> +import java.util.LinkedHashMap;
> +import java.util.LinkedHashSet;
> import java.util.List;
> import java.util.Locale;
> import java.util.Map;
> import java.util.Objects;
> +import java.util.Set;
> +import java.util.function.Predicate;
> +import java.util.stream.Collectors;
>
> /**
> * Table based on an Elasticsearch type.
> */
> -public class ElasticsearchTable extends AbstractElasticsearchTable {
> +public class ElasticsearchTable extends AbstractQueryableTable implements TranslatableTable {
> +
> + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchTable.class);
> +
> + /**
> + * Used for constructing (possibly nested) Elastic aggregation nodes.
> + */
> + private static final String AGGREGATIONS = "aggregations";
> +
> private final RestClient restClient;
> private final ElasticsearchVersion version;
> -
> + private final String indexName;
> + private final String typeName;
> + final ObjectMapper mapper;
>
> /**
> * Creates an ElasticsearchTable.
> @@ -58,7 +94,7 @@ public class ElasticsearchTable extends AbstractElasticsearchTable {
> * @param typeName elastic searh index type
> */
> ElasticsearchTable(RestClient client, ObjectMapper mapper, String indexName, String typeName) {
> - super(indexName, typeName, Objects.requireNonNull(mapper, "mapper"));
> + super(Object[].class);
> this.restClient = Objects.requireNonNull(client, "client");
> try {
> this.version = detectVersion(client, mapper);
> @@ -67,6 +103,9 @@ public class ElasticsearchTable extends AbstractElasticsearchTable {
> + "for %s/%s", indexName, typeName);
> throw new UncheckedIOException(message, e);
> }
> + this.indexName = Objects.requireNonNull(indexName, "indexName");
> + this.typeName = Objects.requireNonNull(typeName, "typeName");
> + this.mapper = Objects.requireNonNull(mapper, "mapper");
>
> }
>
> @@ -87,37 +126,211 @@ public class ElasticsearchTable extends AbstractElasticsearchTable {
> return ElasticsearchVersion.fromString(node.get("version").get("number").asText());
> }
>
> - @Override protected String scriptedFieldPrefix() {
> + /**
> + * In ES 5.x scripted fields start with {@code params._source.foo} while in ES2.x
> + * {@code _source.foo}. Helper method to build correct query based on runtime version of elastic.
> + * Used to keep backwards compatibility with ES2.
> + *
> + * @see <a href="https://github.com/elastic/elasticsearch/issues/20068">_source variable</a>
> + * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-scripting-fields.html">Scripted Fields</a>
> + * @return string to be used for scripted fields
> + */
> + String scriptedFieldPrefix() {
> // ES2 vs ES5 scripted field difference
> return version == ElasticsearchVersion.ES2
> ? ElasticsearchConstants.SOURCE_GROOVY
> : ElasticsearchConstants.SOURCE_PAINLESS;
> }
>
> - @Override protected Enumerable<Object> find(String index, List<String> ops,
> - List<Map.Entry<String, Class>> fields) {
> + /**
> + * Executes a "find" operation on the underlying type.
> + *
> + * <p>For example,
> + * <code>client.prepareSearch(index).setTypes(type)
> + * .setSource("{\"fields\" : [\"state\"]}")</code></p>
> + *
> + * @param ops List of operations represented as Json strings.
> + * @param fields List of fields to project; or null to return map
> + * @param sort list of fields to sort and their direction (asc/desc)
> + * @param aggregations aggregation functions
> + * @return Enumerator of results
> + */
> + protected Enumerable<Object> find(List<String> ops,
> + List<Map.Entry<String, Class>> fields,
> + List<Map.Entry<String, RelFieldCollation.Direction>> sort,
> + List<String> groupBy,
> + List<Map.Entry<String, String>> aggregations,
> + Long offset, Long fetch) throws IOException {
> +
> + if (!aggregations.isEmpty()) {
> + // process aggregations separately
> + return aggregate(ops, fields, sort, groupBy, aggregations, offset, fetch);
> + }
> +
> + final ObjectNode query = mapper.createObjectNode();
>
> - final String query;
> - if (!ops.isEmpty()) {
> - query = "{" + Util.toString(ops, "", ", ", "") + "}";
> - } else {
> - query = "{}";
> + // manually parse from previously concatenated string
> + query.setAll(
> + (ObjectNode) mapper.readTree("{"
> + + Util.toString(ops, "", ", ", "") + "}"));
> +
> + if (!sort.isEmpty()) {
> + ArrayNode sortNode = query.withArray("sort");
> + sort.forEach(e ->
> + sortNode.add(
> + mapper.createObjectNode().put(e.getKey(), e.getValue().isDescending() ? "desc" : "asc"))
> + );
> + }
> +
> + if (offset != null) {
> + query.put("from", offset);
> + }
> +
> + if (fetch != null) {
> + query.put("size", fetch);
> }
>
> try {
> - ElasticsearchSearchResult result = httpRequest(query);
> - final Function1<ElasticsearchSearchResult.SearchHit, Object> getter =
> + ElasticsearchJson.Result search = httpRequest(query);
> + final Function1<ElasticsearchJson.SearchHit, Object> getter =
> ElasticsearchEnumerators.getter(fields);
> - return Linq4j.asEnumerable(result.searchHits().hits()).select(getter);
> + return Linq4j.asEnumerable(search.searchHits().hits()).select(getter);
> } catch (IOException e) {
> throw new UncheckedIOException(e);
> }
> }
>
> - private ElasticsearchSearchResult httpRequest(String query) throws IOException {
> + private Enumerable<Object> aggregate(List<String> ops,
> + List<Map.Entry<String, Class>> fields,
> + List<Map.Entry<String, RelFieldCollation.Direction>> sort,
> + List<String> groupBy,
> + List<Map.Entry<String, String>> aggregations,
> + Long offset, Long fetch) throws IOException {
> +
> + if (aggregations.isEmpty()) {
> + throw new IllegalArgumentException("Missing Aggregations");
> + }
> +
> + if (!groupBy.isEmpty() && offset != null) {
> + String message = "Currently ES doesn't support generic pagination "
> + + "with aggregations. You can still use LIMIT keyword (without OFFSET). "
> + + "For more details see https://github.com/elastic/elasticsearch/issues/4915";
> + throw new IllegalStateException(message);
> + }
> +
> + final ObjectNode query = mapper.createObjectNode();
> +
> + // manually parse into JSON from previously concatenated strings
> + query.setAll((ObjectNode) mapper.readTree("{" + Util.toString(ops, "", ", ", "") + "}"));
> +
> + // remove / override attributes which are not applicable to aggregations
> + query.put("_source", false);
> + query.put("size", 0);
> + query.remove("script_fields");
> +
> + // allows to detect aggregation for count(*)
> + final Predicate<Map.Entry<String, String>> isCountStar = e -> e.getValue()
> + .contains("\"" + ElasticsearchConstants.ID + "\"");
> +
> + // list of expressions which are count(*)
> + final Set<String> countAll = aggregations.stream()
> + .filter(isCountStar)
> + .map(Map.Entry::getKey).collect(Collectors.toSet());
> +
> + final Map<String, String> fieldMap = new HashMap<>();
> +
> + // due to ES aggregation format. fields in "order by" clause should go first
> + // if "order by" is missing. order in "group by" is un-important
> + final Set<String> orderedGroupBy = new LinkedHashSet<>();
> + orderedGroupBy.addAll(sort.stream().map(Map.Entry::getKey).collect(Collectors.toList()));
> + orderedGroupBy.addAll(groupBy);
> +
> + // construct nested aggregations node(s)
> + ObjectNode parent = query.with(AGGREGATIONS);
> + for (String name: orderedGroupBy) {
> + final String aggName = "g_" + name;
> + fieldMap.put(aggName, name);
> +
> + final ObjectNode section = parent.with(aggName);
> + final ObjectNode terms = section.with("terms");
> + terms.put("field", name);
> + terms.set("missing", ElasticsearchJson.MISSING_VALUE); // expose missing terms
> +
> + if (fetch != null) {
> + terms.put("size", fetch);
> + }
> +
> + sort.stream().filter(e -> e.getKey().equals(name)).findAny().ifPresent(s -> {
> + terms.with("order").put("_key", s.getValue().isDescending() ? "desc" : "asc");
> + });
> +
> + parent = section.with(AGGREGATIONS);
> + }
> +
> + // simple version for queries like "select count(*), max(col1) from table" (no GROUP BY cols)
> + if (!groupBy.isEmpty() || !aggregations.stream().allMatch(isCountStar)) {
> + for (Map.Entry<String, String> aggregation : aggregations) {
> + JsonNode value = mapper.readTree("{" + aggregation.getValue() + "}");
> + parent.set(aggregation.getKey(), value);
> + }
> + }
> +
> + // cleanup query. remove empty AGGREGATIONS element (if empty)
> + JsonNode agg = query;
> + while (agg.has(AGGREGATIONS) && agg.get(AGGREGATIONS).elements().hasNext()) {
> + agg = agg.get(AGGREGATIONS);
> + }
> + ((ObjectNode) agg).remove(AGGREGATIONS);
> +
> + ElasticsearchJson.Result res = httpRequest(query);
> +
> + final List<Map<String, Object>> result = new ArrayList<>();
> + if (res.aggregations() != null) {
> + // collect values
> + ElasticsearchJson.visitValueNodes(res.aggregations(), m -> {
> + Map<String, Object> newMap = new LinkedHashMap<>();
> + for (String key: m.keySet()) {
> + newMap.put(fieldMap.getOrDefault(key, key), m.get(key));
> + }
> + result.add(newMap);
> + });
> + } else {
> + // probably no group by. add single result
> + result.add(new LinkedHashMap<>());
> + }
> +
> + // elastic exposes total number of documents matching a query in "/hits/total" path
> + // this can be used for simple "select count(*) from table"
> + final long total = res.searchHits().total();
> +
> + if (groupBy.isEmpty()) {
> + // put totals automatically for count(*) expression(s), unless they contain group by
> + for (String expr : countAll) {
> + result.forEach(m -> m.put(expr, total));
> + }
> + }
> +
> + final Function1<ElasticsearchJson.SearchHit, Object> getter =
> + ElasticsearchEnumerators.getter(fields);
> +
> + ElasticsearchJson.SearchHits hits =
> + new ElasticsearchJson.SearchHits(total, result.stream()
> + .map(r -> new ElasticsearchJson.SearchHit("_id", r, null))
> + .collect(Collectors.toList()));
> +
> + return Linq4j.asEnumerable(hits.hits()).select(getter);
> + }
> +
> + private ElasticsearchJson.Result httpRequest(ObjectNode query) throws IOException {
> Objects.requireNonNull(query, "query");
> String uri = String.format(Locale.ROOT, "/%s/%s/_search", indexName, typeName);
> - HttpEntity entity = new StringEntity(query, ContentType.APPLICATION_JSON);
> +
> + Hook.QUERY_PLAN.run(query);
> + final String json = mapper.writeValueAsString(query);
> +
> + LOGGER.debug("Elasticsearch Query: {}", json);
> +
> + HttpEntity entity = new StringEntity(json, ContentType.APPLICATION_JSON);
> Response response = restClient.performRequest("POST", uri, Collections.emptyMap(), entity);
> if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
> final String error = EntityUtils.toString(response.getEntity());
> @@ -128,9 +341,75 @@ public class ElasticsearchTable extends AbstractElasticsearchTable {
> }
>
> try (InputStream is = response.getEntity().getContent()) {
> - return mapper.readValue(is, ElasticsearchSearchResult.class);
> + return mapper.readValue(is, ElasticsearchJson.Result.class);
> }
> }
> +
> + @Override public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
> + final RelDataType mapType = relDataTypeFactory.createMapType(
> + relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR),
> + relDataTypeFactory.createTypeWithNullability(
> + relDataTypeFactory.createSqlType(SqlTypeName.ANY),
> + true));
> + return relDataTypeFactory.builder().add("_MAP", mapType).build();
> + }
> +
> + @Override public String toString() {
> + return "ElasticsearchTable{" + indexName + "/" + typeName + "}";
> + }
> +
> + @Override public <T> Queryable<T> asQueryable(QueryProvider queryProvider, SchemaPlus schema,
> + String tableName) {
> + return new ElasticsearchQueryable<>(queryProvider, schema, this, tableName);
> + }
> +
> + @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
> + final RelOptCluster cluster = context.getCluster();
> + return new ElasticsearchTableScan(cluster, cluster.traitSetOf(ElasticsearchRel.CONVENTION),
> + relOptTable, this, null);
> + }
> +
> + /**
> + * Implementation of {@link Queryable} based on
> + * a {@link ElasticsearchTable}.
> + *
> + * @param <T> element type
> + */
> + public static class ElasticsearchQueryable<T> extends AbstractTableQueryable<T> {
> + ElasticsearchQueryable(QueryProvider queryProvider, SchemaPlus schema,
> + ElasticsearchTable table, String tableName) {
> + super(queryProvider, schema, table, tableName);
> + }
> +
> + public Enumerator<T> enumerator() {
> + return null;
> + }
> +
> + private ElasticsearchTable getTable() {
> + return (ElasticsearchTable) table;
> + }
> +
> + /** Called via code-generation.
> + * @param ops list of queries (as strings)
> + * @param fields projection
> + * @see ElasticsearchMethod#ELASTICSEARCH_QUERYABLE_FIND
> + * @return result as enumerable
> + */
> + @SuppressWarnings("UnusedDeclaration")
> + public Enumerable<Object> find(List<String> ops,
> + List<Map.Entry<String, Class>> fields,
> + List<Map.Entry<String, RelFieldCollation.Direction>> sort,
> + List<String> groupBy,
> + List<Map.Entry<String, String>> aggregations,
> + Long offset, Long fetch) {
> + try {
> + return getTable().find(ops, fields, sort, groupBy, aggregations, offset, fetch);
> + } catch (IOException e) {
> + throw new UncheckedIOException("Failed to query " + getTable().indexName, e);
> + }
> + }
> +
> + }
> }
>
> // End ElasticsearchTable.java
>
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
> index 7795ad3..3dd041a 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
> @@ -37,7 +37,7 @@ import java.util.Objects;
> * using the "find" method.</p>
> */
> public class ElasticsearchTableScan extends TableScan implements ElasticsearchRel {
> - private final AbstractElasticsearchTable elasticsearchTable;
> + private final ElasticsearchTable elasticsearchTable;
> private final RelDataType projectRowType;
>
> /**
> @@ -50,10 +50,10 @@ public class ElasticsearchTableScan extends TableScan implements ElasticsearchRe
> * @param projectRowType Fields and types to project; null to project raw row
> */
> ElasticsearchTableScan(RelOptCluster cluster, RelTraitSet traitSet,
> - RelOptTable table, AbstractElasticsearchTable elasticsearchTable,
> + RelOptTable table, ElasticsearchTable elasticsearchTable,
> RelDataType projectRowType) {
> super(cluster, traitSet, table);
> - this.elasticsearchTable = Objects.requireNonNull(elasticsearchTable);
> + this.elasticsearchTable = Objects.requireNonNull(elasticsearchTable, "elasticsearchTable");
> this.projectRowType = projectRowType;
>
> assert getConvention() == ElasticsearchRel.CONVENTION;
>
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
> index 51a2bd5..5e788a8 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
> @@ -30,12 +30,10 @@ import org.apache.calcite.plan.RelOptCluster;
> import org.apache.calcite.plan.RelOptCost;
> import org.apache.calcite.plan.RelOptPlanner;
> import org.apache.calcite.plan.RelTraitSet;
> -import org.apache.calcite.prepare.CalcitePrepareImpl;
> import org.apache.calcite.rel.RelNode;
> import org.apache.calcite.rel.convert.ConverterImpl;
> import org.apache.calcite.rel.metadata.RelMetadataQuery;
> import org.apache.calcite.rel.type.RelDataType;
> -import org.apache.calcite.runtime.Hook;
> import org.apache.calcite.util.BuiltInMethod;
> import org.apache.calcite.util.Pair;
>
> @@ -60,15 +58,15 @@ public class ElasticsearchToEnumerableConverter extends ConverterImpl implements
> return super.computeSelfCost(planner, mq).multiplyBy(.1);
> }
>
> - @Override public Result implement(EnumerableRelImplementor implementor, Prefer prefer) {
> - final BlockBuilder list = new BlockBuilder();
> - final ElasticsearchRel.Implementor elasticsearchImplementor =
> - new ElasticsearchRel.Implementor();
> - elasticsearchImplementor.visitChild(0, getInput());
> + @Override public Result implement(EnumerableRelImplementor relImplementor, Prefer prefer) {
> + final BlockBuilder block = new BlockBuilder();
> + final ElasticsearchRel.Implementor implementor = new ElasticsearchRel.Implementor();
> + implementor.visitChild(0, getInput());
> +
> final RelDataType rowType = getRowType();
> - final PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), rowType,
> + final PhysType physType = PhysTypeImpl.of(relImplementor.getTypeFactory(), rowType,
> prefer.prefer(JavaRowFormat.ARRAY));
> - final Expression fields = list.append("fields",
> + final Expression fields = block.append("fields",
> constantArrayList(
> Pair.zip(ElasticsearchRules.elasticsearchFieldNames(rowType),
> new AbstractList<Class>() {
> @@ -81,20 +79,24 @@ public class ElasticsearchToEnumerableConverter extends ConverterImpl implements
> }
> }),
> Pair.class));
> - final Expression table = list.append("table",
> - elasticsearchImplementor.table
> - .getExpression(AbstractElasticsearchTable.ElasticsearchQueryable.class));
> - List<String> opList = elasticsearchImplementor.list;
> - final Expression ops = list.append("ops", constantArrayList(opList, String.class));
> - Expression enumerable = list.append("enumerable",
> + final Expression table = block.append("table",
> + implementor.table
> + .getExpression(ElasticsearchTable.ElasticsearchQueryable.class));
> + List<String> opList = implementor.list;
> + final Expression ops = block.append("ops", constantArrayList(opList, String.class));
> + final Expression sort = block.append("sort", constantArrayList(implementor.sort, Pair.class));
> + final Expression groupBy = block.append("groupBy", Expressions.constant(implementor.groupBy));
> + final Expression aggregations = block.append("aggregations",
> + constantArrayList(implementor.aggregations, Pair.class));
> +
> + final Expression offset = block.append("offset", Expressions.constant(implementor.offset));
> + final Expression fetch = block.append("fetch", Expressions.constant(implementor.fetch));
> +
> + Expression enumerable = block.append("enumerable",
> Expressions.call(table, ElasticsearchMethod.ELASTICSEARCH_QUERYABLE_FIND.method, ops,
> - fields));
> - if (CalcitePrepareImpl.DEBUG) {
> - System.out.println("Elasticsearch: " + opList);
> - }
> - Hook.QUERY_PLAN.run(opList);
> - list.add(Expressions.return_(null, enumerable));
> - return implementor.result(physType, list.toBlock());
> + fields, sort, groupBy, aggregations, offset, fetch));
> + block.add(Expressions.return_(null, enumerable));
> + return relImplementor.result(physType, block.toBlock());
> }
>
> /** E.g. {@code constantArrayList("x", "y")} returns
>
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
> index 97f7943..a866fe4 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
> @@ -78,6 +78,16 @@ class PredicateAnalyzer {
> }
> }
>
> + /**
> + * Thrown when {@link org.apache.calcite.rel.RelNode} expression can't be processed
> + * (or converted into ES query)
> + */
> + static class ExpressionNotAnalyzableException extends Exception {
> + ExpressionNotAnalyzableException(String message, Throwable cause) {
> + super(message, cause);
> + }
> + }
> +
> private PredicateAnalyzer() {}
>
> /**
>
Re: [2/2] calcite git commit: [CALCITE-2528] Support Aggregates in
ElasticSearch adapter (Andrei Sereda)
Posted by Andrei Sereda <an...@sereda.cc>.
Thank you Julian, for correction.
I wasn't sure if I should remove it since the original commit (and PR) was
created when I wasn't yet a committer. Going forward I will not include my
name for personal commits.
On Wed, Sep 19, 2018 at 10:34 AM Julian Hyde <jh...@gmail.com> wrote:
> Thanks Andrei. One thing: now you’re a committer your commit messages
> should not (must not) end with “(Andrei Sereda)”.
>
> Julian
>
> > On Sep 18, 2018, at 7:57 PM, sereda@apache.org wrote:
> >
> > [CALCITE-2528] Support Aggregates in ElasticSearch adapter (Andrei
> Sereda)
> >
> > Aggregate functions (count/sum/min/max/avg) are pushed down to ES.
> >
> > Add ElasticsearchAggregate relational expression to convert SQL into
> native Elastic aggregations (value_count, min, max etc.).
> > Enhance ElasticsearchTable to prepare correct aggregate ES JSON query.
> >
> > Create special classes to parse recursively elastic aggregation response
> or buckets (located in ElasticJson). They're inspired from existing Elastic
> high-level client source.
> >
> > For tests, make Json input more human friendly. Single quotes are
> accepted and fields can be unquoted (unless
> > they contain special characters). Also field with dots 'a.b.c' are
> automatically auto-expanded. This reduces JSON noise.
> >
> > Fix single projections which previously returned map (see [CALCITE-2485])
> >
> > Close apache/calcite#801
> > Close apache/calcite#822
> >
> >
> > Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
> > Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/79af1c9b
> > Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/79af1c9b
> > Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/79af1c9b
> >
> > Branch: refs/heads/master
> > Commit: 79af1c9ba735286653697deed3ff849b7c921fe4
> > Parents: ce05146
> > Author: Andrei Sereda <25...@users.noreply.github.com>
> > Authored: Tue Sep 18 22:53:24 2018 -0400
> > Committer: Andrei Sereda <25...@users.noreply.github.com>
> > Committed: Tue Sep 18 22:53:24 2018 -0400
> >
> > ----------------------------------------------------------------------
> > elasticsearch/pom.xml | 6 +
> > .../AbstractElasticsearchTable.java | 150 -----
> > .../elasticsearch/ElasticsearchAggregate.java | 165 +++++
> > .../elasticsearch/ElasticsearchConstants.java | 9 -
> > .../elasticsearch/ElasticsearchEnumerators.java | 44 +-
> > .../elasticsearch/ElasticsearchFilter.java | 17 +-
> > .../elasticsearch/ElasticsearchJson.java | 614 +++++++++++++++++++
> > .../elasticsearch/ElasticsearchMethod.java | 13 +-
> > .../elasticsearch/ElasticsearchProject.java | 6 +-
> > .../adapter/elasticsearch/ElasticsearchRel.java | 66 +-
> > .../elasticsearch/ElasticsearchRules.java | 38 +-
> > .../elasticsearch/ElasticsearchSchema.java | 30 +-
> > .../elasticsearch/ElasticsearchSort.java | 41 +-
> > .../elasticsearch/ElasticsearchTable.java | 313 +++++++++-
> > .../elasticsearch/ElasticsearchTableScan.java | 6 +-
> > .../ElasticsearchToEnumerableConverter.java | 46 +-
> > .../elasticsearch/PredicateAnalyzer.java | 10 +
> > .../adapter/elasticsearch/QueryBuilders.java | 106 +++-
> > .../adapter/elasticsearch/AggregationTest.java | 235 +++++++
> > .../adapter/elasticsearch/BooleanLogicTest.java | 1 +
> > .../elasticsearch/ElasticSearchAdapterTest.java | 309 +++++++---
> > .../elasticsearch/ElasticsearchJsonTest.java | 183 ++++++
> > .../EmbeddedElasticsearchPolicy.java | 41 +-
> > .../adapter/elasticsearch/Projection2Test.java | 107 ++++
> > .../adapter/elasticsearch/ProjectionTest.java | 37 +-
> > .../elasticsearch/QueryBuildersTest.java | 63 ++
> > .../calcite/test/ElasticsearchChecker.java | 90 ++-
> > 27 files changed, 2340 insertions(+), 406 deletions(-)
> > ----------------------------------------------------------------------
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/pom.xml
> > ----------------------------------------------------------------------
> > diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml
> > index e3a044d..4700fee 100644
> > --- a/elasticsearch/pom.xml
> > +++ b/elasticsearch/pom.xml
> > @@ -124,6 +124,12 @@ limitations under the License.
> > <scope>test</scope>
> > </dependency>
> > <dependency>
> > + <groupId>org.hamcrest</groupId>
> > + <artifactId>hamcrest-core</artifactId>
> > + <version>${hamcrest.version}</version>
> > + <scope>test</scope>
> > + </dependency>
> > + <dependency>
> > <groupId>org.slf4j</groupId>
> > <artifactId>slf4j-api</artifactId>
> > </dependency>
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java
> > deleted file mode 100644
> > index 1a0f6d0..0000000
> > ---
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java
> > +++ /dev/null
> > @@ -1,150 +0,0 @@
> > -/*
> > - * Licensed to the Apache Software Foundation (ASF) under one or more
> > - * contributor license agreements. See the NOTICE file distributed with
> > - * this work for additional information regarding copyright ownership.
> > - * The ASF licenses this file to you under the Apache License, Version
> 2.0
> > - * (the "License"); you may not use this file except in compliance with
> > - * the License. You may obtain a copy of the License at
> > - *
> > - * http://www.apache.org/licenses/LICENSE-2.0
> > - *
> > - * Unless required by applicable law or agreed to in writing, software
> > - * distributed under the License is distributed on an "AS IS" BASIS,
> > - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > - * See the License for the specific language governing permissions and
> > - * limitations under the License.
> > - */
> > -package org.apache.calcite.adapter.elasticsearch;
> > -
> > -import org.apache.calcite.adapter.java.AbstractQueryableTable;
> > -import org.apache.calcite.linq4j.Enumerable;
> > -import org.apache.calcite.linq4j.Enumerator;
> > -import org.apache.calcite.linq4j.QueryProvider;
> > -import org.apache.calcite.linq4j.Queryable;
> > -import org.apache.calcite.plan.RelOptCluster;
> > -import org.apache.calcite.plan.RelOptTable;
> > -import org.apache.calcite.rel.RelNode;
> > -import org.apache.calcite.rel.type.RelDataType;
> > -import org.apache.calcite.rel.type.RelDataTypeFactory;
> > -import org.apache.calcite.schema.SchemaPlus;
> > -import org.apache.calcite.schema.TranslatableTable;
> > -import org.apache.calcite.schema.impl.AbstractTableQueryable;
> > -import org.apache.calcite.sql.type.SqlTypeName;
> > -
> > -import com.fasterxml.jackson.databind.ObjectMapper;
> > -
> > -import java.util.List;
> > -import java.util.Map;
> > -import java.util.Objects;
> > -
> > -/**
> > - * Table based on an Elasticsearch type.
> > - */
> > -abstract class AbstractElasticsearchTable extends AbstractQueryableTable
> > - implements TranslatableTable {
> > -
> > - final String indexName;
> > - final String typeName;
> > - final ObjectMapper mapper;
> > -
> > - /**
> > - * Creates an ElasticsearchTable.
> > - * @param indexName Elastic Search index
> > - * @param typeName Elastic Search index type
> > - * @param mapper Jackson API to parse (and created) JSON documents
> > - */
> > - AbstractElasticsearchTable(String indexName, String typeName,
> ObjectMapper mapper) {
> > - super(Object[].class);
> > - this.indexName = Objects.requireNonNull(indexName, "indexName");
> > - this.typeName = Objects.requireNonNull(typeName, "typeName");
> > - this.mapper = Objects.requireNonNull(mapper, "mapper");
> > - }
> > -
> > - @Override public String toString() {
> > - return "ElasticsearchTable{" + indexName + "/" + typeName + "}";
> > - }
> > -
> > - public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
> > - final RelDataType mapType = relDataTypeFactory.createMapType(
> > - relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR),
> > - relDataTypeFactory.createTypeWithNullability(
> > - relDataTypeFactory.createSqlType(SqlTypeName.ANY),
> > - true));
> > - return relDataTypeFactory.builder().add("_MAP", mapType).build();
> > - }
> > -
> > - public <T> Queryable<T> asQueryable(QueryProvider queryProvider,
> SchemaPlus schema,
> > - String tableName) {
> > - return new ElasticsearchQueryable<>(queryProvider, schema, this,
> tableName);
> > - }
> > -
> > - public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable
> relOptTable) {
> > - final RelOptCluster cluster = context.getCluster();
> > - return new ElasticsearchTableScan(cluster,
> cluster.traitSetOf(ElasticsearchRel.CONVENTION),
> > - relOptTable, this, null);
> > - }
> > -
> > - /**
> > - * In ES 5.x scripted fields start with {@code params._source.foo}
> while in ES2.x
> > - * {@code _source.foo}. Helper method to build correct query based on
> runtime version of elastic.
> > - * Used to keep backwards compatibility with ES2.
> > - *
> > - * @see <a href="
> https://github.com/elastic/elasticsearch/issues/20068">_source
> variable</a>
> > - * @see <a href="
> https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-scripting-fields.html">Scripted
> Fields</a>
> > - * @return string to be used for scripted fields
> > - */
> > - protected abstract String scriptedFieldPrefix();
> > -
> > - /** Executes a "find" operation on the underlying type.
> > - *
> > - * <p>For example,
> > - * <code>client.prepareSearch(index).setTypes(type)
> > - * .setSource("{\"fields\" : [\"state\"]}")</code></p>
> > - *
> > - * @param index Elasticsearch index
> > - * @param ops List of operations represented as Json strings.
> > - * @param fields List of fields to project; or null to return map
> > - * @return Enumerator of results
> > - */
> > - protected abstract Enumerable<Object> find(String index, List<String>
> ops,
> > - List<Map.Entry<String, Class>> fields);
> > -
> > - /**
> > - * Implementation of {@link Queryable} based on
> > - * a {@link AbstractElasticsearchTable}.
> > - *
> > - * @param <T> element type
> > - */
> > - public static class ElasticsearchQueryable<T> extends
> AbstractTableQueryable<T> {
> > - ElasticsearchQueryable(QueryProvider queryProvider, SchemaPlus
> schema,
> > - AbstractElasticsearchTable table, String tableName) {
> > - super(queryProvider, schema, table, tableName);
> > - }
> > -
> > - public Enumerator<T> enumerator() {
> > - return null;
> > - }
> > -
> > - private String getIndex() {
> > - return schema.unwrap(ElasticsearchSchema.class).getIndex();
> > - }
> > -
> > - private AbstractElasticsearchTable getTable() {
> > - return (AbstractElasticsearchTable) table;
> > - }
> > -
> > - /** Called via code-generation.
> > - * @param ops list of queries (as strings)
> > - * @param fields projection
> > - * @see ElasticsearchMethod#ELASTICSEARCH_QUERYABLE_FIND
> > - * @return result as enumerable
> > - */
> > - @SuppressWarnings("UnusedDeclaration")
> > - public Enumerable<Object> find(List<String> ops,
> > - List<Map.Entry<String, Class>> fields) {
> > - return getTable().find(getIndex(), ops, fields);
> > - }
> > - }
> > -}
> > -
> > -// End AbstractElasticsearchTable.java
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
> > new file mode 100644
> > index 0000000..9627aca
> > --- /dev/null
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
> > @@ -0,0 +1,165 @@
> > +/*
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements. See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to you under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License. You may obtain a copy of the License at
> > + *
> > + * http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +package org.apache.calcite.adapter.elasticsearch;
> > +
> > +import org.apache.calcite.plan.RelOptCluster;
> > +import org.apache.calcite.plan.RelOptCost;
> > +import org.apache.calcite.plan.RelOptPlanner;
> > +import org.apache.calcite.plan.RelTraitSet;
> > +import org.apache.calcite.rel.InvalidRelException;
> > +import org.apache.calcite.rel.RelNode;
> > +import org.apache.calcite.rel.core.Aggregate;
> > +import org.apache.calcite.rel.core.AggregateCall;
> > +import org.apache.calcite.rel.metadata.RelMetadataQuery;
> > +import org.apache.calcite.rel.type.RelDataType;
> > +import org.apache.calcite.rel.type.RelDataTypeField;
> > +import org.apache.calcite.sql.SqlKind;
> > +import org.apache.calcite.util.ImmutableBitSet;
> > +
> > +import java.util.ArrayList;
> > +import java.util.EnumSet;
> > +import java.util.List;
> > +import java.util.Locale;
> > +import java.util.Set;
> > +
> > +/**
> > + * Implementation of
> > + * {@link org.apache.calcite.rel.core.Aggregate} relational expression
> > + * for ElasticSearch.
> > + */
> > +public class ElasticsearchAggregate extends Aggregate implements
> ElasticsearchRel {
> > +
> > + private static final Set<SqlKind> SUPPORTED_AGGREGATIONS =
> > + EnumSet.of(SqlKind.COUNT, SqlKind.MAX, SqlKind.MIN, SqlKind.AVG,
> SqlKind.SUM);
> > +
> > + /** Creates a ElasticsearchAggregate */
> > + ElasticsearchAggregate(RelOptCluster cluster,
> > + RelTraitSet traitSet,
> > + RelNode input,
> > + boolean indicator,
> > + ImmutableBitSet groupSet,
> > + List<ImmutableBitSet> groupSets,
> > + List<AggregateCall> aggCalls) throws InvalidRelException {
> > + super(cluster, traitSet, input, indicator, groupSet, groupSets,
> aggCalls);
> > +
> > + if (getConvention() != input.getConvention()) {
> > + String message = String.format(Locale.ROOT, "%s != %s",
> getConvention(),
> > + input.getConvention());
> > + throw new AssertionError(message);
> > + }
> > +
> > + assert getConvention() == input.getConvention();
> > + assert getConvention() == ElasticsearchRel.CONVENTION;
> > + assert this.groupSets.size() == 1 : "Grouping sets not supported";
> > +
> > + for (AggregateCall aggCall : aggCalls) {
> > + if (aggCall.isDistinct()) {
> > + throw new InvalidRelException("distinct aggregation not
> supported");
> > + }
> > +
> > + SqlKind kind = aggCall.getAggregation().getKind();
> > + if (!SUPPORTED_AGGREGATIONS.contains(kind)) {
> > + final String message = String.format(Locale.ROOT,
> > + "Aggregation %s not supported (use one of %s)", kind,
> SUPPORTED_AGGREGATIONS);
> > + throw new InvalidRelException(message);
> > + }
> > + }
> > +
> > + if (getGroupType() != Group.SIMPLE) {
> > + final String message = String.format(Locale.ROOT, "Only %s
> grouping is supported. "
> > + + "Yours is %s", Group.SIMPLE, getGroupType());
> > + throw new InvalidRelException(message);
> > + }
> > +
> > + }
> > +
> > + @Override public Aggregate copy(RelTraitSet traitSet, RelNode input,
> boolean indicator,
> > + ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets,
> > + List<AggregateCall> aggCalls) {
> > + try {
> > + return new ElasticsearchAggregate(getCluster(), traitSet, input,
> > + indicator, groupSet, groupSets,
> > + aggCalls);
> > + } catch (InvalidRelException e) {
> > + throw new AssertionError(e);
> > + }
> > + }
> > +
> > + @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
> RelMetadataQuery mq) {
> > + return super.computeSelfCost(planner, mq).multiplyBy(0.1);
> > + }
> > +
> > + @Override public void implement(Implementor implementor) {
> > + implementor.visitChild(0, getInput());
> > + List<String> inputFields = fieldNames(getInput().getRowType());
> > +
> > + for (int group : groupSet) {
> > + implementor.addGroupBy(inputFields.get(group));
> > + }
> > +
> > + for (AggregateCall aggCall : aggCalls) {
> > + List<String> names = new ArrayList<>();
> > + for (int i : aggCall.getArgList()) {
> > + names.add(inputFields.get(i));
> > + }
> > +
> > + final String name = names.isEmpty() ? ElasticsearchConstants.ID :
> names.get(0);
> > +
> > + String op = String.format(Locale.ROOT, "\"%s\":{\"field\":
> \"%s\"}",
> > + toElasticAggregate(aggCall),
> > + name);
> > +
> > + implementor.addAggregation(aggCall.getName(), op);
> > + }
> > + }
> > +
> > + /**
> > + * Most of the aggregations can be retrieved with single
> > + * <a href="
> https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-stats-aggregation.html
> ">stats</a>
> > + * function. But currently only one-to-one mapping is supported
> between sql agg and elastic
> > + * aggregation.
> > + */
> > + private String toElasticAggregate(AggregateCall call) {
> > + SqlKind kind = call.getAggregation().getKind();
> > + switch (kind) {
> > + case COUNT:
> > + return call.isApproximate() ? "cardinality" : "value_count";
> > + case SUM:
> > + return "sum";
> > + case MIN:
> > + return "min";
> > + case MAX:
> > + return "max";
> > + case AVG:
> > + return "avg";
> > + default:
> > + throw new IllegalArgumentException("Unknown aggregation kind " +
> kind + " for " + call);
> > + }
> > + }
> > +
> > + private List<String> fieldNames(RelDataType relDataType) {
> > + List<String> names = new ArrayList<>();
> > +
> > + for (RelDataTypeField rdtf : relDataType.getFieldList()) {
> > + names.add(rdtf.getName());
> > + }
> > + return names;
> > + }
> > +
> > +}
> > +
> > +// End ElasticsearchAggregate.java
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
> > index ed628cc..2c4c42c 100644
> > ---
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
> > @@ -30,18 +30,9 @@ interface ElasticsearchConstants {
> > String FIELDS = "fields";
> > String SOURCE_PAINLESS = "params._source";
> > String SOURCE_GROOVY = "_source";
> > - String SOURCE = SOURCE_GROOVY;
> > String ID = "_id";
> > String UID = "_uid";
> >
> > - /* Aggregation pushdown operations supported */
> > - String AGG_SUM = "SUM";
> > - String AGG_SUM0 = "$SUM0";
> > - String AGG_COUNT = "COUNT";
> > - String AGG_MIN = "MIN";
> > - String AGG_MAX = "MAX";
> > - String AGG_AVG = "AVG";
> > -
> > Set<String> META_COLUMNS = ImmutableSet.of(UID, ID, TYPE, INDEX);
> >
> > }
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
> > index d87de7e..16ac92d 100644
> > ---
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
> > @@ -26,27 +26,27 @@ import java.util.Map;
> >
> > /**
> > * Util functions which convert
> > - * {@link
> org.apache.calcite.adapter.elasticsearch.ElasticsearchSearchResult.SearchHit}
> > + * {@link ElasticsearchJson.SearchHit}
> > * into calcite specific return type (map, object[], list etc.)
> > */
> > class ElasticsearchEnumerators {
> >
> > private ElasticsearchEnumerators() {}
> >
> > - private static Function1<ElasticsearchSearchResult.SearchHit, Map>
> mapGetter() {
> > - return new Function1<ElasticsearchSearchResult.SearchHit, Map>() {
> > - public Map apply(ElasticsearchSearchResult.SearchHit hits) {
> > + private static Function1<ElasticsearchJson.SearchHit, Map>
> mapGetter() {
> > + return new Function1<ElasticsearchJson.SearchHit, Map>() {
> > + public Map apply(ElasticsearchJson.SearchHit hits) {
> > return hits.sourceOrFields();
> > }
> > };
> > }
> >
> > - private static Function1<ElasticsearchSearchResult.SearchHit, Object>
> singletonGetter(
> > + private static Function1<ElasticsearchJson.SearchHit, Object>
> singletonGetter(
> > final String fieldName,
> > final Class fieldClass) {
> > - return new Function1<ElasticsearchSearchResult.SearchHit, Object>()
> {
> > - public Object apply(ElasticsearchSearchResult.SearchHit hits) {
> > - return convert(hits.sourceOrFields(), fieldClass);
> > + return new Function1<ElasticsearchJson.SearchHit, Object>() {
> > + public Object apply(ElasticsearchJson.SearchHit hits) {
> > + return convert(hits.valueOrNull(fieldName), fieldClass);
> > }
> > };
> > }
> > @@ -59,30 +59,38 @@ class ElasticsearchEnumerators {
> > *
> > * @return function that converts the search result into a generic
> array
> > */
> > - private static Function1<ElasticsearchSearchResult.SearchHit,
> Object[]> listGetter(
> > + private static Function1<ElasticsearchJson.SearchHit, Object[]>
> listGetter(
> > final List<Map.Entry<String, Class>> fields) {
> > - return new Function1<ElasticsearchSearchResult.SearchHit,
> Object[]>() {
> > - public Object[] apply(ElasticsearchSearchResult.SearchHit hit) {
> > + return new Function1<ElasticsearchJson.SearchHit, Object[]>() {
> > + public Object[] apply(ElasticsearchJson.SearchHit hit) {
> > Object[] objects = new Object[fields.size()];
> > for (int i = 0; i < fields.size(); i++) {
> > final Map.Entry<String, Class> field = fields.get(i);
> > final String name = field.getKey();
> > final Class type = field.getValue();
> > - objects[i] = convert(hit.value(name), type);
> > + objects[i] = convert(hit.valueOrNull(name), type);
> > }
> > return objects;
> > }
> > };
> > }
> >
> > - static Function1<ElasticsearchSearchResult.SearchHit, Object> getter(
> > + static Function1<ElasticsearchJson.SearchHit, Object> getter(
> > List<Map.Entry<String, Class>> fields) {
> > //noinspection unchecked
> > - return fields == null
> > - ? (Function1) mapGetter()
> > - : fields.size() == 1
> > - ? singletonGetter(fields.get(0).getKey(),
> fields.get(0).getValue())
> > - : (Function1) listGetter(fields);
> > + final Function1 getter;
> > + if (fields == null || fields.size() == 1 &&
> "_MAP".equals(fields.get(0).getKey())) {
> > + // select * from table
> > + getter = mapGetter();
> > + } else if (fields.size() == 1) {
> > + // select foo from table
> > + getter = singletonGetter(fields.get(0).getKey(),
> fields.get(0).getValue());
> > + } else {
> > + // select a, b, c from table
> > + getter = listGetter(fields);
> > + }
> > +
> > + return getter;
> > }
> >
> > private static Object convert(Object o, Class clazz) {
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
> > index 4d187b1..c339671 100644
> > ---
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
> > @@ -23,7 +23,6 @@ import org.apache.calcite.plan.RelOptUtil;
> > import org.apache.calcite.plan.RelTraitSet;
> > import org.apache.calcite.rel.RelNode;
> > import org.apache.calcite.rel.core.Filter;
> > -import org.apache.calcite.rel.core.Project;
> > import org.apache.calcite.rel.metadata.RelMetadataQuery;
> > import org.apache.calcite.rex.RexCall;
> > import org.apache.calcite.rex.RexInputRef;
> > @@ -70,24 +69,13 @@ public class ElasticsearchFilter extends Filter
> implements ElasticsearchRel {
> >
> > @Override public void implement(Implementor implementor) {
> > implementor.visitChild(0, getInput());
> > - List<String> fieldNames;
> > - if (input instanceof Project) {
> > - final List<RexNode> projects = ((Project) input).getProjects();
> > - fieldNames = new ArrayList<>(projects.size());
> > - for (RexNode project : projects) {
> > - String name =
> project.accept(MapProjectionFieldVisitor.INSTANCE);
> > - fieldNames.add(name);
> > - }
> > - } else {
> > - fieldNames =
> ElasticsearchRules.elasticsearchFieldNames(getRowType());
> > - }
> > ObjectMapper mapper = implementor.elasticsearchTable.mapper;
> > PredicateAnalyzerTranslator translator = new
> PredicateAnalyzerTranslator(mapper);
> > try {
> > implementor.add(translator.translateMatch(condition));
> > } catch (IOException e) {
> > throw new UncheckedIOException(e);
> > - } catch (ExpressionNotAnalyzableException e) {
> > + } catch (PredicateAnalyzer.ExpressionNotAnalyzableException e) {
> > throw new RuntimeException(e);
> > }
> > }
> > @@ -103,7 +91,8 @@ public class ElasticsearchFilter extends Filter
> implements ElasticsearchRel {
> > this.mapper = Objects.requireNonNull(mapper, "mapper");
> > }
> >
> > - String translateMatch(RexNode condition) throws IOException,
> ExpressionNotAnalyzableException {
> > + String translateMatch(RexNode condition) throws IOException,
> > + PredicateAnalyzer.ExpressionNotAnalyzableException {
> >
> > StringWriter writer = new StringWriter();
> > JsonGenerator generator =
> mapper.getFactory().createGenerator(writer);
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
> > new file mode 100644
> > index 0000000..7c80e82
> > --- /dev/null
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
> > @@ -0,0 +1,614 @@
> > +/*
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements. See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to you under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License. You may obtain a copy of the License at
> > + *
> > + * http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +package org.apache.calcite.adapter.elasticsearch;
> > +
> > +import com.fasterxml.jackson.annotation.JsonCreator;
> > +import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
> > +import com.fasterxml.jackson.annotation.JsonProperty;
> > +import com.fasterxml.jackson.core.JsonParser;
> > +import com.fasterxml.jackson.core.JsonProcessingException;
> > +import com.fasterxml.jackson.databind.DeserializationContext;
> > +import com.fasterxml.jackson.databind.JsonNode;
> > +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
> > +import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
> > +import com.fasterxml.jackson.databind.node.ArrayNode;
> > +import com.fasterxml.jackson.databind.node.JsonNodeFactory;
> > +import com.fasterxml.jackson.databind.node.ObjectNode;
> > +
> > +import java.io.IOException;
> > +import java.time.Duration;
> > +import java.util.ArrayList;
> > +import java.util.Arrays;
> > +import java.util.Collections;
> > +import java.util.HashSet;
> > +import java.util.Iterator;
> > +import java.util.LinkedHashMap;
> > +import java.util.List;
> > +import java.util.Locale;
> > +import java.util.Map;
> > +import java.util.Objects;
> > +import java.util.Set;
> > +import java.util.function.BiConsumer;
> > +import java.util.function.Consumer;
> > +import java.util.stream.StreamSupport;
> > +
> > +import static java.util.Collections.unmodifiableMap;
> > +
> > +/**
> > + * Internal objects (and deserializers) used to parse elastic search
> results
> > + * (which are in JSON format).
> > + *
> > + * <p>Since we're using basic row-level rest client http response has
> to be
> > + * processed manually using JSON (jackson) library.
> > + */
> > +class ElasticsearchJson {
> > +
> > + /**
> > + * Used as special aggregation key for missing values (documents
> which are missing a field).
> > + * Buckets with that value are then converted to {@code null}s in
> flat tabular format.
> > + * @see <a href="
> https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-sum-aggregation.html">Missing
> Value</a>
> > + */
> > + static final JsonNode MISSING_VALUE =
> JsonNodeFactory.instance.textNode("__MISSING__");
> > +
> > + private ElasticsearchJson() {}
> > +
> > + /**
> > + * Visits leaves of the aggregation where all values are stored.
> > + */
> > + static void visitValueNodes(Aggregations aggregations,
> Consumer<Map<String, Object>> consumer) {
> > + Objects.requireNonNull(aggregations, "aggregations");
> > + Objects.requireNonNull(consumer, "consumer");
> > +
> > + List<Bucket> buckets = new ArrayList<>();
> > +
> > + Map<RowKey, List<MultiValue>> rows = new LinkedHashMap<>();
> > +
> > + BiConsumer<RowKey, MultiValue> cons = (r, v) ->
> > + rows.computeIfAbsent(r, ignore -> new ArrayList<>()).add(v);
> > + aggregations.forEach(a -> visitValueNodes(a, buckets, cons));
> > + rows.forEach((k, v) -> {
> > + Map<String, Object> row = new LinkedHashMap<>(k.keys);
> > + v.forEach(val -> row.put(val.getName(), val.value()));
> > + consumer.accept(row);
> > + });
> > + }
> > +
> > + /**
> > + * Identifies a calcite row (as in relational algebra)
> > + */
> > + private static class RowKey {
> > + private final Map<String, Object> keys;
> > + private final int hashCode;
> > +
> > + private RowKey(final Map<String, Object> keys) {
> > + this.keys = Objects.requireNonNull(keys, "keys");
> > + this.hashCode = Objects.hashCode(keys);
> > + }
> > +
> > + private RowKey(List<Bucket> buckets) {
> > + this(toMap(buckets));
> > + }
> > +
> > + private static Map<String, Object> toMap(Iterable<Bucket> buckets) {
> > + return StreamSupport.stream(buckets.spliterator(), false)
> > + .collect(LinkedHashMap::new,
> > + (m, v) -> m.put(v.getName(), v.key()),
> > + LinkedHashMap::putAll);
> > + }
> > +
> > + @Override public boolean equals(final Object o) {
> > + if (this == o) {
> > + return true;
> > + }
> > + if (o == null || getClass() != o.getClass()) {
> > + return false;
> > + }
> > + final RowKey rowKey = (RowKey) o;
> > + return hashCode == rowKey.hashCode
> > + && Objects.equals(keys, rowKey.keys);
> > + }
> > +
> > + @Override public int hashCode() {
> > + return this.hashCode;
> > + }
> > + }
> > +
> > + private static void visitValueNodes(Aggregation aggregation,
> List<Bucket> parents,
> > + BiConsumer<RowKey, MultiValue> consumer) {
> > +
> > + if (aggregation instanceof MultiValue) {
> > + // publish one value of the row
> > + RowKey key = new RowKey(parents);
> > + consumer.accept(key, (MultiValue) aggregation);
> > + return;
> > + }
> > +
> > + if (aggregation instanceof Bucket) {
> > + Bucket bucket = (Bucket) aggregation;
> > + parents.add(bucket);
> > + bucket.getAggregations().forEach(a -> visitValueNodes(a, parents,
> consumer));
> > + parents.remove(parents.size() - 1);
> > + } else if (aggregation instanceof HasAggregations) {
> > + HasAggregations children = (HasAggregations) aggregation;
> > + children.getAggregations().forEach(a -> visitValueNodes(a,
> parents, consumer));
> > + } else if (aggregation instanceof MultiBucketsAggregation) {
> > + MultiBucketsAggregation multi = (MultiBucketsAggregation)
> aggregation;
> > + multi.buckets().forEach(b -> {
> > + parents.add(b);
> > + b.getAggregations().forEach(a -> visitValueNodes(a, parents,
> consumer));
> > + parents.remove(parents.size() - 1);
> > + });
> > + }
> > +
> > + }
> > +
> > + /**
> > + * Response from Elastic
> > + */
> > + @JsonIgnoreProperties(ignoreUnknown = true)
> > + static class Result {
> > + private final SearchHits hits;
> > + private final Aggregations aggregations;
> > + private final long took;
> > +
> > + /**
> > + * Constructor for this instance.
> > + * @param hits list of matched documents
> > + * @param took time taken (in took) for this query to execute
> > + */
> > + @JsonCreator
> > + Result(@JsonProperty("hits") SearchHits hits,
> > + @JsonProperty("aggregations") Aggregations aggregations,
> > + @JsonProperty("took") long took) {
> > + this.hits = Objects.requireNonNull(hits, "hits");
> > + this.aggregations = aggregations;
> > + this.took = took;
> > + }
> > +
> > + SearchHits searchHits() {
> > + return hits;
> > + }
> > +
> > + Aggregations aggregations() {
> > + return aggregations;
> > + }
> > +
> > + public Duration took() {
> > + return Duration.ofMillis(took);
> > + }
> > +
> > + }
> > +
> > + /**
> > + * Similar to {@code SearchHits} in ES. Container for {@link
> SearchHit}
> > + */
> > + @JsonIgnoreProperties(ignoreUnknown = true)
> > + static class SearchHits {
> > +
> > + private final long total;
> > + private final List<SearchHit> hits;
> > +
> > + @JsonCreator
> > + SearchHits(@JsonProperty("total")final long total,
> > + @JsonProperty("hits") final List<SearchHit> hits) {
> > + this.total = total;
> > + this.hits = Objects.requireNonNull(hits, "hits");
> > + }
> > +
> > + public List<SearchHit> hits() {
> > + return this.hits;
> > + }
> > +
> > + public long total() {
> > + return total;
> > + }
> > +
> > + }
> > +
> > + /**
> > + * Concrete result record which matched the query. Similar to {@code
> SearchHit} in ES.
> > + */
> > + @JsonIgnoreProperties(ignoreUnknown = true)
> > + static class SearchHit {
> > + private final String id;
> > + private final Map<String, Object> source;
> > + private final Map<String, Object> fields;
> > +
> > + @JsonCreator
> > + SearchHit(@JsonProperty("_id") final String id,
> > + @JsonProperty("_source") final Map<String,
> Object> source,
> > + @JsonProperty("fields") final Map<String, Object>
> fields) {
> > + this.id = Objects.requireNonNull(id, "id");
> > +
> > + // both can't be null
> > + if (source == null && fields == null) {
> > + final String message = String.format(Locale.ROOT,
> > + "Both '_source' and 'fields' are missing for %s", id);
> > + throw new IllegalArgumentException(message);
> > + }
> > +
> > + // both can't be non-null
> > + if (source != null && fields != null) {
> > + final String message = String.format(Locale.ROOT,
> > + "Both '_source' and 'fields' are populated (non-null) for
> %s", id);
> > + throw new IllegalArgumentException(message);
> > + }
> > +
> > + this.source = source;
> > + this.fields = fields;
> > + }
> > +
> > + /**
> > + * Returns id of this hit (usually document id)
> > + * @return unique id
> > + */
> > + public String id() {
> > + return id;
> > + }
> > +
> > + Object valueOrNull(String name) {
> > + Objects.requireNonNull(name, "name");
> > + if (fields != null && fields.containsKey(name)) {
> > + Object field = fields.get(name);
> > + if (field instanceof Iterable) {
> > + // return first element (or null)
> > + Iterator<?> iter = ((Iterable<?>) field).iterator();
> > + return iter.hasNext() ? iter.next() : null;
> > + }
> > +
> > + return field;
> > + }
> > +
> > + return valueFromPath(source, name);
> > + }
> > +
> > + /**
> > + * Returns property from nested maps given a path like {@code
> a.b.c}.
> > + * @param map current map
> > + * @param path field path(s), optionally with dots ({@code a.b.c}).
> > + * @return value located at path {@code path} or {@code null} if
> not found.
> > + */
> > + private static Object valueFromPath(Map<String, Object> map, String
> path) {
> > + if (map == null) {
> > + return null;
> > + }
> > +
> > + if (map.containsKey(path)) {
> > + return map.get(path);
> > + }
> > +
> > + // maybe pattern of type a.b.c
> > + final int index = path.indexOf('.');
> > + if (index == -1) {
> > + return null;
> > + }
> > +
> > + final String prefix = path.substring(0, index);
> > + final String suffix = path.substring(index + 1);
> > +
> > + Object maybeMap = map.get(prefix);
> > + if (maybeMap instanceof Map) {
> > + return valueFromPath((Map<String, Object>) maybeMap, suffix);
> > + }
> > +
> > + return null;
> > + }
> > +
> > + Map<String, Object> source() {
> > + return source;
> > + }
> > +
> > + Map<String, Object> fields() {
> > + return fields;
> > + }
> > +
> > + Map<String, Object> sourceOrFields() {
> > + return source != null ? source : fields;
> > + }
> > + }
> > +
> > +
> > + /**
> > + * {@link Aggregation} container.
> > + */
> > + @JsonDeserialize(using = AggregationsDeserializer.class)
> > + static class Aggregations implements Iterable<Aggregation> {
> > +
> > + private final List<? extends Aggregation> aggregations;
> > + private Map<String, Aggregation> aggregationsAsMap;
> > +
> > + Aggregations(List<? extends Aggregation> aggregations) {
> > + this.aggregations = Objects.requireNonNull(aggregations,
> "aggregations");
> > + }
> > +
> > + /**
> > + * Iterates over the {@link Aggregation}s.
> > + */
> > + @Override public final Iterator<Aggregation> iterator() {
> > + return asList().iterator();
> > + }
> > +
> > + /**
> > + * The list of {@link Aggregation}s.
> > + */
> > + final List<Aggregation> asList() {
> > + return Collections.unmodifiableList(aggregations);
> > + }
> > +
> > + /**
> > + * Returns the {@link Aggregation}s keyed by aggregation name. Lazy
> init.
> > + */
> > + final Map<String, Aggregation> asMap() {
> > + if (aggregationsAsMap == null) {
> > + Map<String, Aggregation> map = new
> LinkedHashMap<>(aggregations.size());
> > + for (Aggregation aggregation : aggregations) {
> > + map.put(aggregation.getName(), aggregation);
> > + }
> > + this.aggregationsAsMap = unmodifiableMap(map);
> > + }
> > + return aggregationsAsMap;
> > + }
> > +
> > + /**
> > + * Returns the aggregation that is associated with the specified
> name.
> > + */
> > + @SuppressWarnings("unchecked")
> > + public final <A extends Aggregation> A get(String name) {
> > + return (A) asMap().get(name);
> > + }
> > +
> > + @Override public final boolean equals(Object obj) {
> > + if (obj == null || getClass() != obj.getClass()) {
> > + return false;
> > + }
> > + return aggregations.equals(((Aggregations) obj).aggregations);
> > + }
> > +
> > + @Override public final int hashCode() {
> > + return Objects.hash(getClass(), aggregations);
> > + }
> > +
> > + }
> > +
> > + /**
> > + * Identifies all aggregations
> > + */
> > + interface Aggregation {
> > +
> > + /**
> > + * @return The name of this aggregation.
> > + */
> > + String getName();
> > +
> > + }
> > +
> > + /**
> > + * Allows traversing aggregations tree
> > + */
> > + interface HasAggregations {
> > + Aggregations getAggregations();
> > + }
> > +
> > + /**
> > + * An aggregation that returns multiple buckets
> > + */
> > + static class MultiBucketsAggregation implements Aggregation {
> > +
> > + private final String name;
> > + private final List<Bucket> buckets;
> > +
> > + MultiBucketsAggregation(final String name,
> > + final List<Bucket> buckets) {
> > + this.name = name;
> > + this.buckets = buckets;
> > + }
> > +
> > + /**
> > + * @return The buckets of this aggregation.
> > + */
> > + List<Bucket> buckets() {
> > + return buckets;
> > + }
> > +
> > + @Override public String getName() {
> > + return name;
> > + }
> > + }
> > +
> > + /**
> > + * A bucket represents a criteria to which all documents that fall in
> it adhere to.
> > + * It is also uniquely identified
> > + * by a key, and can potentially hold sub-aggregations computed over
> all documents in it.
> > + */
> > + static class Bucket implements HasAggregations, Aggregation {
> > + private final Object key;
> > + private final String name;
> > + private final Aggregations aggregations;
> > +
> > + Bucket(final Object key,
> > + final String name,
> > + final Aggregations aggregations) {
> > + this.key = key; // key can be set after construction
> > + this.name = Objects.requireNonNull(name, "name");
> > + this.aggregations = Objects.requireNonNull(aggregations,
> "aggregations");
> > + }
> > +
> > + /**
> > + * @return The key associated with the bucket
> > + */
> > + Object key() {
> > + return key;
> > + }
> > +
> > + /**
> > + * @return The key associated with the bucket as a string
> > + */
> > + String keyAsString() {
> > + return Objects.toString(key());
> > + }
> > +
> > + /**
> > + * @return The sub-aggregations of this bucket
> > + */
> > + @Override public Aggregations getAggregations() {
> > + return aggregations;
> > + }
> > +
> > + @Override public String getName() {
> > + return name;
> > + }
> > + }
> > +
> > + /**
> > + * Multi value aggregatoin like
> > + * <a href="
> https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-stats-aggregation.html
> ">Stats</a>
> > + */
> > + static class MultiValue implements Aggregation {
> > + private final String name;
> > + private final Map<String, Object> values;
> > +
> > + MultiValue(final String name, final Map<String, Object> values) {
> > + this.name = Objects.requireNonNull(name, "name");
> > + this.values = Objects.requireNonNull(values, "values");
> > + }
> > +
> > + @Override public String getName() {
> > + return name;
> > + }
> > +
> > + Map<String, Object> values() {
> > + return values;
> > + }
> > +
> > + /**
> > + * For single value. Returns single value represented by this leaf
> aggregation.
> > + * @return value corresponding to {@code value}
> > + */
> > + Object value() {
> > + if (!values().containsKey("value")) {
> > + throw new IllegalStateException("'value' field not present in
> this aggregation");
> > + }
> > +
> > + return values().get("value");
> > + }
> > +
> > + }
> > +
> > + /**
> > + * Allows to de-serialize nested aggregation structures.
> > + */
> > + static class AggregationsDeserializer extends
> StdDeserializer<Aggregations> {
> > +
> > + private static final Set<String> IGNORE_TOKENS = new
> HashSet<>(Arrays.asList("meta",
> > + "buckets", "value", "values", "value_as_string", "doc_count",
> "key", "key_as_string"));
> > +
> > + AggregationsDeserializer() {
> > + super(Aggregations.class);
> > + }
> > +
> > + @Override public Aggregations deserialize(final JsonParser parser,
> > + final DeserializationContext ctxt)
> > + throws IOException {
> > +
> > + ObjectNode node = parser.getCodec().readTree(parser);
> > + return parseAggregations(parser, node);
> > + }
> > +
> > + private static Aggregations parseAggregations(JsonParser parser,
> ObjectNode node)
> > + throws JsonProcessingException {
> > +
> > + List<Aggregation> aggregations = new ArrayList<>();
> > +
> > + Iterable<Map.Entry<String, JsonNode>> iter = node::fields;
> > + for (Map.Entry<String, JsonNode> entry : iter) {
> > + final String name = entry.getKey();
> > + final JsonNode value = entry.getValue();
> > +
> > + Aggregation agg = null;
> > + if (value.has("buckets")) {
> > + agg = parseBuckets(parser, name, (ArrayNode)
> value.get("buckets"));
> > + } else if (value.isObject() && !IGNORE_TOKENS.contains(name)) {
> > + // leaf
> > + agg = parseValue(parser, name, (ObjectNode) value);
> > + }
> > +
> > + if (agg != null) {
> > + aggregations.add(agg);
> > + }
> > + }
> > +
> > + return new Aggregations(aggregations);
> > + }
> > +
> > +
> > +
> > + private static MultiValue parseValue(JsonParser parser, String
> name, ObjectNode node)
> > + throws JsonProcessingException {
> > +
> > + return new MultiValue(name, parser.getCodec().treeToValue(node,
> Map.class));
> > + }
> > +
> > + private static Aggregation parseBuckets(JsonParser parser, String
> name, ArrayNode nodes)
> > + throws JsonProcessingException {
> > +
> > + List<Bucket> buckets = new ArrayList<>(nodes.size());
> > + for (JsonNode b: nodes) {
> > + buckets.add(parseBucket(parser, name, (ObjectNode) b));
> > + }
> > +
> > + return new MultiBucketsAggregation(name, buckets);
> > + }
> > +
> > + /**
> > + * Determines if current key is a missing field key. Missing key is
> returned when document
> > + * does not have pivoting attribute (example {@code GROUP BY
> _MAP['a.b.missing']}). It helps
> > + * grouping documents which don't have a field. In relational
> algebra this
> > + * would be {@code null}.
> > + *
> > + * @param key current {@code key} (usually string) as returned by ES
> > + * @return {@code true} if this value
> > + * @see #MISSING_VALUE
> > + */
> > + private static boolean isMissingBucket(JsonNode key) {
> > + return MISSING_VALUE.equals(key);
> > + }
> > +
> > + private static Bucket parseBucket(JsonParser parser, String name,
> ObjectNode node)
> > + throws JsonProcessingException {
> > +
> > + final JsonNode keyNode = node.get("key");
> > + final Object key;
> > + if (isMissingBucket(keyNode) || keyNode.isNull()) {
> > + key = null;
> > + } else if (keyNode.isTextual()) {
> > + key = keyNode.textValue();
> > + } else if (keyNode.isNumber()) {
> > + key = keyNode.numberValue();
> > + } else if (keyNode.isBoolean()) {
> > + key = keyNode.booleanValue();
> > + } else {
> > + // don't usually expect keys to be Objects
> > + key = parser.getCodec().treeToValue(node, Map.class);
> > + }
> > +
> > + return new Bucket(key, name, parseAggregations(parser, node));
> > + }
> > +
> > + }
> > +}
> > +
> > +// End ElasticsearchJson.java
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
> > index 72753e6..709156f 100644
> > ---
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
> > @@ -27,8 +27,17 @@ import java.util.List;
> > * Builtin methods in the Elasticsearch adapter.
> > */
> > enum ElasticsearchMethod {
> > -
> ELASTICSEARCH_QUERYABLE_FIND(AbstractElasticsearchTable.ElasticsearchQueryable.class,
> > - "find", List.class, List.class);
> > +
> > +
> ELASTICSEARCH_QUERYABLE_FIND(ElasticsearchTable.ElasticsearchQueryable.class,
> > + "find",
> > + List.class, // ops - projections and other stuff
> > + List.class, // fields
> > + List.class, // sort
> > + List.class, // groupBy
> > + List.class, // aggregations
> > + Long.class, // offset
> > + Long.class // fetch
> > + );
> >
> > public final Method method;
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
> > index 7d5811c..d0841c3 100644
> > ---
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
> > @@ -101,11 +101,7 @@ public class ElasticsearchProject extends Project
> implements ElasticsearchRel {
> > query.append("\"script_fields\": {" + String.join(", ",
> scriptFields) + "}");
> > }
> >
> > - for (String opfield : implementor.list) {
> > - if (opfield.startsWith("\"_source\"")) {
> > - implementor.list.remove(opfield);
> > - }
> > - }
> > + implementor.list.removeIf(l -> l.startsWith("\"_source\""));
> > implementor.add(query.toString());
> > }
> > }
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
> > index 436adf9..1dad691 100644
> > ---
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
> > @@ -18,10 +18,14 @@ package org.apache.calcite.adapter.elasticsearch;
> >
> > import org.apache.calcite.plan.Convention;
> > import org.apache.calcite.plan.RelOptTable;
> > +import org.apache.calcite.rel.RelFieldCollation;
> > import org.apache.calcite.rel.RelNode;
> > +import org.apache.calcite.util.Pair;
> >
> > import java.util.ArrayList;
> > import java.util.List;
> > +import java.util.Map;
> > +import java.util.Objects;
> >
> > /**
> > * Relational expression that uses Elasticsearch calling convention.
> > @@ -39,19 +43,75 @@ public interface ElasticsearchRel extends RelNode {
> > * {@link ElasticsearchRel} nodes into an Elasticsearch query.
> > */
> > class Implementor {
> > +
> > final List<String> list = new ArrayList<>();
> >
> > + /**
> > + * Sorting clauses.
> > + * @see <a href="
> https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-sort.html
> ">Sort</a>
> > + */
> > + final List<Map.Entry<String, RelFieldCollation.Direction>> sort =
> new ArrayList<>();
> > +
> > + /**
> > + * Elastic aggregation ({@code MIN / MAX / COUNT} etc.) statements
> (functions).
> > + * @see <a href="
> https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html
> ">aggregations</a>
> > + */
> > + final List<Map.Entry<String, String>> aggregations = new
> ArrayList<>();
> > +
> > + /**
> > + * Allows bucketing documents together. Similar to {@code select
> ... from table group by field1}
> > + * @see <a href="
> https://www.elastic.co/guide/en/elasticsearch/reference/6.3/search-aggregations-bucket.html">Bucket
> Aggregrations</a>
> > + */
> > + final List<String> groupBy = new ArrayList<>();
> > +
> > + /**
> > + * Starting index (default {@code 0}). Equivalent to {@code start}
> in ES query.
> > + * @see <a href="
> https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html
> ">From/Size</a>
> > + */
> > + Long offset;
> > +
> > + /**
> > + * Number of records to return. Equivalent to {@code size} in ES
> query.
> > + * @see <a href="
> https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html
> ">From/Size</a>
> > + */
> > + Long fetch;
> > +
> > RelOptTable table;
> > - AbstractElasticsearchTable elasticsearchTable;
> > + ElasticsearchTable elasticsearchTable;
> >
> > - public void add(String findOp) {
> > + void add(String findOp) {
> > list.add(findOp);
> > }
> >
> > - public void visitChild(int ordinal, RelNode input) {
> > + void addGroupBy(String field) {
> > + Objects.requireNonNull(field, "field");
> > + groupBy.add(field);
> > + }
> > +
> > + void addSort(String field, RelFieldCollation.Direction direction) {
> > + Objects.requireNonNull(field, "field");
> > + sort.add(new Pair<>(field, direction));
> > + }
> > +
> > + void addAggregation(String field, String expression) {
> > + Objects.requireNonNull(field, "field");
> > + Objects.requireNonNull(expression, "expression");
> > + aggregations.add(new Pair<>(field, expression));
> > + }
> > +
> > + void offset(long offset) {
> > + this.offset = offset;
> > + }
> > +
> > + void fetch(long fetch) {
> > + this.fetch = fetch;
> > + }
> > +
> > + void visitChild(int ordinal, RelNode input) {
> > assert ordinal == 0;
> > ((ElasticsearchRel) input).implement(this);
> > }
> > +
> > }
> > }
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
> > index 97e934c..b442ddd 100644
> > ---
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
> > @@ -23,10 +23,12 @@ import org.apache.calcite.plan.Convention;
> > import org.apache.calcite.plan.RelOptRule;
> > import org.apache.calcite.plan.RelTrait;
> > import org.apache.calcite.plan.RelTraitSet;
> > +import org.apache.calcite.rel.InvalidRelException;
> > import org.apache.calcite.rel.RelCollations;
> > import org.apache.calcite.rel.RelNode;
> > import org.apache.calcite.rel.convert.ConverterRule;
> > import org.apache.calcite.rel.core.Sort;
> > +import org.apache.calcite.rel.logical.LogicalAggregate;
> > import org.apache.calcite.rel.logical.LogicalFilter;
> > import org.apache.calcite.rel.logical.LogicalProject;
> > import org.apache.calcite.rel.type.RelDataType;
> > @@ -53,7 +55,8 @@ class ElasticsearchRules {
> > static final RelOptRule[] RULES = {
> > ElasticsearchSortRule.INSTANCE,
> > ElasticsearchFilterRule.INSTANCE,
> > - ElasticsearchProjectRule.INSTANCE
> > + ElasticsearchProjectRule.INSTANCE,
> > + ElasticsearchAggregateRule.INSTANCE
> > };
> >
> > private ElasticsearchRules() {}
> > @@ -147,7 +150,7 @@ class ElasticsearchRules {
> > }
> > }
> > throw new IllegalArgumentException("Translation of " +
> call.toString()
> > - + "is not supported by ElasticsearchProject");
> > + + " is not supported by ElasticsearchProject");
> > }
> >
> > List<String> visitList(List<RexNode> list) {
> > @@ -217,6 +220,37 @@ class ElasticsearchRules {
> > }
> >
> > /**
> > + * Rule to convert an {@link
> org.apache.calcite.rel.logical.LogicalAggregate}
> > + * to an {@link ElasticsearchAggregate}.
> > + */
> > + private static class ElasticsearchAggregateRule extends
> ElasticsearchConverterRule {
> > + static final RelOptRule INSTANCE = new ElasticsearchAggregateRule();
> > +
> > + private ElasticsearchAggregateRule() {
> > + super(LogicalAggregate.class, Convention.NONE,
> ElasticsearchRel.CONVENTION,
> > + "ElasticsearchAggregateRule");
> > + }
> > +
> > + public RelNode convert(RelNode rel) {
> > + final LogicalAggregate agg = (LogicalAggregate) rel;
> > + final RelTraitSet traitSet = agg.getTraitSet().replace(out);
> > + try {
> > + return new ElasticsearchAggregate(
> > + rel.getCluster(),
> > + traitSet,
> > + convert(agg.getInput(), traitSet.simplify()),
> > + agg.indicator,
> > + agg.getGroupSet(),
> > + agg.getGroupSets(),
> > + agg.getAggCallList());
> > + } catch (InvalidRelException e) {
> > + return null;
> > + }
> > + }
> > + }
> > +
> > +
> > + /**
> > * Rule to convert a {@link
> org.apache.calcite.rel.logical.LogicalProject}
> > * to an {@link ElasticsearchProject}.
> > */
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
> > index 1c630ad..80a94be 100644
> > ---
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
> > @@ -30,6 +30,7 @@ import org.elasticsearch.client.RestClient;
> > import java.io.IOException;
> > import java.io.InputStream;
> > import java.io.UncheckedIOException;
> > +import java.util.Collections;
> > import java.util.Locale;
> > import java.util.Map;
> > import java.util.Objects;
> > @@ -48,6 +49,8 @@ public class ElasticsearchSchema extends
> AbstractSchema {
> >
> > private final ObjectMapper mapper;
> >
> > + private final Map<String, Table> tableMap;
> > +
> > /**
> > * Allows schema to be instantiated from existing elastic search
> client.
> > * This constructor is used in tests.
> > @@ -56,20 +59,33 @@ public class ElasticsearchSchema extends
> AbstractSchema {
> > * @param index name of ES index
> > */
> > public ElasticsearchSchema(RestClient client, ObjectMapper mapper,
> String index) {
> > + this(client, mapper, index, null);
> > + }
> > +
> > + public ElasticsearchSchema(RestClient client, ObjectMapper mapper,
> String index, String type) {
> > super();
> > this.client = Objects.requireNonNull(client, "client");
> > this.mapper = Objects.requireNonNull(mapper, "mapper");
> > this.index = Objects.requireNonNull(index, "index");
> > + if (type == null) {
> > + try {
> > + this.tableMap = createTables(listTypesFromElastic());
> > + } catch (IOException e) {
> > + throw new UncheckedIOException("Couldn't get types for " +
> index, e);
> > + }
> > + } else {
> > + this.tableMap = createTables(Collections.singleton(type));
> > + }
> > }
> >
> > @Override protected Map<String, Table> getTableMap() {
> > + return tableMap;
> > + }
> > +
> > + private Map<String, Table> createTables(Iterable<String> types) {
> > final ImmutableMap.Builder<String, Table> builder =
> ImmutableMap.builder();
> > - try {
> > - for (String type: listTypes()) {
> > - builder.put(type, new ElasticsearchTable(client, mapper, index,
> type));
> > - }
> > - } catch (IOException e) {
> > - throw new UncheckedIOException("Failed to get types for " +
> index, e);
> > + for (String type : types) {
> > + builder.put(type, new ElasticsearchTable(client, mapper, index,
> type));
> > }
> > return builder.build();
> > }
> > @@ -81,7 +97,7 @@ public class ElasticsearchSchema extends
> AbstractSchema {
> > * @throws IOException for any IO related issues
> > * @throws IllegalStateException if reply is not understood
> > */
> > - private Set<String> listTypes() throws IOException {
> > + private Set<String> listTypesFromElastic() throws IOException {
> > final String endpoint = "/" + index + "/_mapping";
> > final Response response = client.performRequest("GET", endpoint);
> > try (InputStream is = response.getEntity().getContent()) {
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
> > index ed669aa..9078b72 100644
> > ---
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
> > @@ -23,15 +23,12 @@ import org.apache.calcite.plan.RelTraitSet;
> > import org.apache.calcite.rel.RelCollation;
> > import org.apache.calcite.rel.RelFieldCollation;
> > import org.apache.calcite.rel.RelNode;
> > -import org.apache.calcite.rel.core.Project;
> > import org.apache.calcite.rel.core.Sort;
> > import org.apache.calcite.rel.metadata.RelMetadataQuery;
> > import org.apache.calcite.rel.type.RelDataTypeField;
> > import org.apache.calcite.rex.RexLiteral;
> > import org.apache.calcite.rex.RexNode;
> > -import org.apache.calcite.util.Util;
> >
> > -import java.util.ArrayList;
> > import java.util.List;
> >
> > /**
> > @@ -57,48 +54,22 @@ public class ElasticsearchSort extends Sort
> implements ElasticsearchRel {
> >
> > @Override public void implement(Implementor implementor) {
> > implementor.visitChild(0, getInput());
> > - if (!collation.getFieldCollations().isEmpty()) {
> > - final List<String> keys = new ArrayList<>();
> > - if (input instanceof Project) {
> > - final List<RexNode> projects = ((Project) input).getProjects();
> > + final List<RelDataTypeField> fields = getRowType().getFieldList();
> >
> > - for (RelFieldCollation fieldCollation :
> collation.getFieldCollations()) {
> > - RexNode project =
> projects.get(fieldCollation.getFieldIndex());
> > - String name =
> project.accept(MapProjectionFieldVisitor.INSTANCE);
> > - keys.add(ElasticsearchRules.quote(name) + ": " +
> direction(fieldCollation));
> > - }
> > - } else {
> > - final List<RelDataTypeField> fields =
> getRowType().getFieldList();
> > -
> > - for (RelFieldCollation fieldCollation :
> collation.getFieldCollations()) {
> > - final String name =
> fields.get(fieldCollation.getFieldIndex()).getName();
> > - keys.add(ElasticsearchRules.quote(name) + ": " +
> direction(fieldCollation));
> > - }
> > - }
> > -
> > - implementor.add("\"sort\": [ " + Util.toString(keys, "{", "}, {",
> "}") + "]");
> > + for (RelFieldCollation fieldCollation :
> collation.getFieldCollations()) {
> > + final String name =
> fields.get(fieldCollation.getFieldIndex()).getName();
> > + implementor.addSort(name, fieldCollation.getDirection());
> > }
> >
> > if (offset != null) {
> > - implementor.add("\"from\": " + ((RexLiteral) offset).getValue