You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by se...@apache.org on 2019/01/29 03:11:00 UTC
[calcite] branch master updated: [CALCITE-2814] ElasticSearch
adapter. Fix "group by" when using raw item access (eg. _MAP['foo'])
This is an automated email from the ASF dual-hosted git repository.
sereda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/master by this push:
new 542c086 [CALCITE-2814] ElasticSearch adapter. Fix "group by" when using raw item access (eg. _MAP['foo'])
542c086 is described below
commit 542c086749d504de5542c37f12f6a1329842f18d
Author: Andrei Sereda <25...@users.noreply.github.com>
AuthorDate: Sat Jan 26 23:27:33 2019 -0500
[CALCITE-2814] ElasticSearch adapter. Fix "group by" when using raw item access (eg. _MAP['foo'])
The following queries were previsouly failing:
```sql
select max(_MAP['a']), _MAP['b'] from elastic group by _MAP['b']
```
---
.../elasticsearch/ElasticsearchAggregate.java | 7 +++---
.../elasticsearch/ElasticsearchEnumerators.java | 26 +++++++++++++++++-----
.../adapter/elasticsearch/ElasticsearchMethod.java | 3 ++-
.../adapter/elasticsearch/ElasticsearchRel.java | 5 +++--
.../adapter/elasticsearch/ElasticsearchSort.java | 20 ++---------------
.../adapter/elasticsearch/ElasticsearchTable.java | 6 ++---
.../ElasticsearchToEnumerableConverter.java | 5 ++---
.../elasticsearch/ElasticSearchAdapterTest.java | 7 ++++++
8 files changed, 44 insertions(+), 35 deletions(-)
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
index 8ca1397..66dfc43 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
@@ -113,9 +113,10 @@ public class ElasticsearchAggregate extends Aggregate implements ElasticsearchRe
@Override public void implement(Implementor implementor) {
implementor.visitChild(0, getInput());
- List<String> inputFields = fieldNames(getInput().getRowType());
+ final List<String> inputFields = fieldNames(getInput().getRowType());
for (int group : groupSet) {
- implementor.addGroupBy(inputFields.get(group));
+ final String name = inputFields.get(group);
+ implementor.addGroupBy(implementor.expressionItemMap.getOrDefault(name, name));
}
final ObjectMapper mapper = implementor.elasticsearchTable.mapper;
@@ -130,7 +131,7 @@ public class ElasticsearchAggregate extends Aggregate implements ElasticsearchRe
final ObjectNode field = aggregation.with(toElasticAggregate(aggCall));
final String name = names.isEmpty() ? ElasticsearchConstants.ID : names.get(0);
- field.put("field", name);
+ field.put("field", implementor.expressionItemMap.getOrDefault(name, name));
if (aggCall.getAggregation().getKind() == SqlKind.ANY_VALUE) {
field.put("size", 1);
}
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
index be2090a..054d85c 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
@@ -42,12 +42,20 @@ class ElasticsearchEnumerators {
final Class fieldClass,
final Map<String, String> mapping) {
return hit -> {
+ final String key;
+ if (hit.sourceOrFields().containsKey(fieldName)) {
+ key = fieldName;
+ } else {
+ key = mapping.getOrDefault(fieldName, fieldName);
+ }
+
final Object value;
- if (ElasticsearchConstants.ID.equals(mapping.get(fieldName))) {
+ if (ElasticsearchConstants.ID.equals(key)
+ || ElasticsearchConstants.ID.equals(mapping.getOrDefault(fieldName, fieldName))) {
// is the original projection on _id field ?
value = hit.id();
} else {
- value = hit.valueOrNull(fieldName);
+ value = hit.valueOrNull(key);
}
return convert(value, fieldClass);
};
@@ -67,13 +75,21 @@ class ElasticsearchEnumerators {
Object[] objects = new Object[fields.size()];
for (int i = 0; i < fields.size(); i++) {
final Map.Entry<String, Class> field = fields.get(i);
- final Object value;
+ final String key;
+ if (hit.sourceOrFields().containsKey(field.getKey())) {
+ key = field.getKey();
+ } else {
+ key = mapping.getOrDefault(field.getKey(), field.getKey());
+ }
- if (ElasticsearchConstants.ID.equals(mapping.get(field.getKey()))) {
+ final Object value;
+ if (ElasticsearchConstants.ID.equals(key)
+ || ElasticsearchConstants.ID.equals(mapping.get(field.getKey()))
+ || ElasticsearchConstants.ID.equals(field.getKey())) {
// is the original projection on _id field ?
value = hit.id();
} else {
- value = hit.valueOrNull(field.getKey());
+ value = hit.valueOrNull(key);
}
final Class type = field.getValue();
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
index 1e8e13e..f2da014 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap;
import java.lang.reflect.Method;
import java.util.List;
+import java.util.Map;
/**
* Builtin methods in the Elasticsearch adapter.
@@ -35,7 +36,7 @@ enum ElasticsearchMethod {
List.class, // sort
List.class, // groupBy
List.class, // aggregations
- List.class, // expression mapping
+ Map.class, // item to expression mapping. Eg. _MAP['a.b.c'] and EXPR$1
Long.class, // offset
Long.class); // fetch
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
index c0a9ce0..022a867 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
@@ -24,6 +24,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.util.Pair;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -72,7 +73,7 @@ public interface ElasticsearchRel extends RelNode {
*
* @see SqlStdOperatorTable#ITEM
*/
- final List<Map.Entry<String, String>> expressionItemMap = new ArrayList<>();
+ final Map<String, String> expressionItemMap = new LinkedHashMap<>();
/**
* Starting index (default {@code 0}). Equivalent to {@code start} in ES query.
@@ -112,7 +113,7 @@ public interface ElasticsearchRel extends RelNode {
void addExpressionItemMapping(String expressionId, String item) {
Objects.requireNonNull(expressionId, "expressionId");
Objects.requireNonNull(item, "item");
- expressionItemMap.add(new Pair<>(expressionId, item));
+ expressionItemMap.put(expressionId, item);
}
void offset(long offset) {
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
index 7adab31..5dae378 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
@@ -30,9 +30,6 @@ import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Optional;
/**
* Implementation of {@link org.apache.calcite.rel.core.Sort}
@@ -61,21 +58,8 @@ public class ElasticsearchSort extends Sort implements ElasticsearchRel {
for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
final String name = fields.get(fieldCollation.getFieldIndex()).getName();
- // TODO there should be a better way to extract original ITEM
- if (name.toUpperCase(Locale.ROOT).startsWith("EXPR$")) {
- Optional<String> item = implementor.expressionItemMap.stream()
- .filter(m -> name.equals(m.getKey()))
- .map(Map.Entry::getValue).findAny();
-
- if (!item.isPresent()) {
- final String message = String.format(Locale.ROOT, "No mapping found for %s", name);
- throw new IllegalStateException(message);
- }
-
- item.ifPresent(m -> implementor.addSort(m, fieldCollation.getDirection()));
- } else {
- implementor.addSort(name, fieldCollation.getDirection());
- }
+ final String rawName = implementor.expressionItemMap.getOrDefault(name, name);
+ implementor.addSort(rawName, fieldCollation.getDirection());
}
if (offset != null) {
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
index f4d0028..29dc62a 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
@@ -122,7 +122,7 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl
List<Map.Entry<String, RelFieldCollation.Direction>> sort,
List<String> groupBy,
List<Map.Entry<String, String>> aggregations,
- List<Map.Entry<String, String>> mappings,
+ Map<String, String> mappings,
Long offset, Long fetch) throws IOException {
if (!aggregations.isEmpty() || !groupBy.isEmpty()) {
@@ -172,7 +172,7 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl
List<Map.Entry<String, RelFieldCollation.Direction>> sort,
List<String> groupBy,
List<Map.Entry<String, String>> aggregations,
- List<Map.Entry<String, String>> mapping,
+ Map<String, String> mapping,
Long offset, Long fetch) throws IOException {
if (!groupBy.isEmpty() && offset != null) {
@@ -359,7 +359,7 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl
List<Map.Entry<String, RelFieldCollation.Direction>> sort,
List<String> groupBy,
List<Map.Entry<String, String>> aggregations,
- List<Map.Entry<String, String>> mappings,
+ Map<String, String> mappings,
Long offset, Long fetch) {
try {
return getTable().find(ops, fields, sort, groupBy, aggregations, mappings, offset, fetch);
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
index 8a62728..f301a12 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
@@ -82,15 +82,14 @@ public class ElasticsearchToEnumerableConverter extends ConverterImpl implements
final Expression table = block.append("table",
implementor.table
.getExpression(ElasticsearchTable.ElasticsearchQueryable.class));
- List<String> opList = implementor.list;
- final Expression ops = block.append("ops", constantArrayList(opList, String.class));
+ final Expression ops = block.append("ops", Expressions.constant(implementor.list));
final Expression sort = block.append("sort", constantArrayList(implementor.sort, Pair.class));
final Expression groupBy = block.append("groupBy", Expressions.constant(implementor.groupBy));
final Expression aggregations = block.append("aggregations",
constantArrayList(implementor.aggregations, Pair.class));
final Expression mappings = block.append("mappings",
- constantArrayList(implementor.expressionItemMap, Pair.class));
+ Expressions.constant(implementor.expressionItemMap));
final Expression offset = block.append("offset", Expressions.constant(implementor.offset));
final Expression fetch = block.append("fetch", Expressions.constant(implementor.fetch));
diff --git a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
index 507d106..5973436 100644
--- a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
+++ b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
@@ -292,6 +292,13 @@ public class ElasticSearchAdapterTest {
.returns(sortedResultSetChecker("city", RelFieldCollation.Direction.DESCENDING))
.returnsCount(ZIPS_SIZE);
+ CalciteAssert.that()
+ .with(newConnectionFactory())
+ .query("select max(_MAP['pop']), min(_MAP['pop']), _MAP['state'] from elastic.zips "
+ + "group by _MAP['state'] order by _MAP['state'] limit 3")
+ .returnsOrdered("EXPR$0=32383.0; EXPR$1=23238.0; EXPR$2=AK",
+ "EXPR$0=44165.0; EXPR$1=42124.0; EXPR$2=AL",
+ "EXPR$0=53532.0; EXPR$1=37428.0; EXPR$2=AR");
}
/**