You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by mm...@apache.org on 2018/07/20 17:41:47 UTC
[34/53] [abbrv] calcite git commit: [CALCITE-2376] Unify ES2 and ES5
adapters. Migrate to low-level ES rest client as main transport. (Andrei
Sereda)
http://git-wip-us.apache.org/repos/asf/calcite/blob/0204f286/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
new file mode 100644
index 0000000..c84d2c7
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.JsonBuilder;
+import org.apache.calcite.util.Pair;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of a {@link org.apache.calcite.rel.core.Filter}
+ * relational expression in Elasticsearch.
+ */
+public class ElasticsearchFilter extends Filter implements ElasticsearchRel {
+ ElasticsearchFilter(RelOptCluster cluster, RelTraitSet traitSet, RelNode child,
+ RexNode condition) {
+ super(cluster, traitSet, child, condition);
+ assert getConvention() == ElasticsearchRel.CONVENTION;
+ assert getConvention() == child.getConvention();
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+ }
+
+ @Override public Filter copy(RelTraitSet relTraitSet, RelNode input, RexNode condition) {
+ return new ElasticsearchFilter(getCluster(), relTraitSet, input, condition);
+ }
+
+ @Override public void implement(Implementor implementor) {
+ implementor.visitChild(0, getInput());
+ List<String> fieldNames;
+ if (input instanceof Project) {
+ final List<RexNode> projects = ((Project) input).getProjects();
+ fieldNames = new ArrayList<>(projects.size());
+ for (RexNode project : projects) {
+ String name = project.accept(MapProjectionFieldVisitor.INSTANCE);
+ fieldNames.add(name);
+ }
+ } else {
+ fieldNames = ElasticsearchRules.elasticsearchFieldNames(getRowType());
+ }
+ Translator translator = new Translator(fieldNames);
+ String match = translator.translateMatch(condition);
+ implementor.add(match);
+ }
+
+ /**
+ * Translates {@link RexNode} expressions into Elasticsearch expression strings.
+ */
+ static class Translator {
+ final JsonBuilder builder = new JsonBuilder();
+ final Multimap<String, Pair<String, RexLiteral>> multimap =
+ HashMultimap.create();
+ final Map<String, RexLiteral> eqMap = new LinkedHashMap<>();
+ private final List<String> fieldNames;
+
+ Translator(List<String> fieldNames) {
+ this.fieldNames = fieldNames;
+ }
+
+ private String translateMatch(RexNode condition) {
+ // filter node
+ final Map<String, Object> filterMap = new LinkedHashMap<>();
+ filterMap.put("filter", translateOr(condition));
+
+ // constant_score node
+ final Map<String, Object> map = builder.map();
+ map.put("constant_score", filterMap);
+
+ return "\"query\" : " + builder.toJsonString(map).replaceAll("\\s+", "");
+ }
+
+ private Object translateOr(RexNode condition) {
+ final List<Object> list = new ArrayList<>();
+
+ final List<RexNode> orNodes = RelOptUtil.disjunctions(condition);
+ for (RexNode node : orNodes) {
+ List<Map<String, Object>> andNodes = translateAnd(node);
+
+ if (andNodes.size() > 0) {
+ Map<String, Object> andClause = new HashMap<>();
+ andClause.put("must", andNodes);
+
+ // boolean filters
+ LinkedHashMap<String, Object> filterEvaluator = new LinkedHashMap<>();
+ filterEvaluator.put("bool", andClause);
+ list.add(filterEvaluator);
+ } else {
+ list.add(andNodes.get(0));
+ }
+ }
+
+ if (orNodes.size() > 1) {
+ Map<String, Object> map = builder.map();
+ map.put("should", list);
+
+ // boolean filters
+ LinkedHashMap<String, Object> filterEvaluator = new LinkedHashMap<>();
+ filterEvaluator.put("bool", map);
+ return filterEvaluator;
+ } else {
+ return list.get(0);
+ }
+ }
+
+ private void addPredicate(Map<String, Object> map, String op, Object v) {
+ if (map.containsKey(op) && stronger(op, map.get(op), v)) {
+ return;
+ }
+ map.put(op, v);
+ }
+
+ /**
+ * Translates a condition that may be an AND of other conditions. Gathers
+ * together conditions that apply to the same field.
+ *
+ * @param node0 expression node
+ * @return list of elastic search term filters
+ */
+ private List<Map<String, Object>> translateAnd(RexNode node0) {
+ eqMap.clear();
+ multimap.clear();
+ for (RexNode node : RelOptUtil.conjunctions(node0)) {
+ translateMatch2(node);
+ }
+ List<Map<String, Object>> filters = new ArrayList<>();
+ for (Map.Entry<String, RexLiteral> entry : eqMap.entrySet()) {
+ multimap.removeAll(entry.getKey());
+
+ Map<String, Object> filter = new HashMap<>();
+ filter.put(entry.getKey(), literalValue(entry.getValue()));
+
+ Map<String, Object> map = new HashMap<>();
+ map.put("term", filter);
+ filters.add(map);
+ }
+ for (Map.Entry<String, Collection<Pair<String, RexLiteral>>> entry
+ : multimap.asMap().entrySet()) {
+ Map<String, Object> map2 = builder.map();
+
+ Map<String, Object> map = new HashMap<>();
+ for (Pair<String, RexLiteral> s : entry.getValue()) {
+ if (!s.left.equals("not")) {
+ addPredicate(map2, s.left, literalValue(s.right));
+
+ Map<String, Object> filter = new HashMap<>();
+ filter.put(entry.getKey(), map2);
+
+ map.put("range", filter);
+ } else {
+ map2.put(entry.getKey(), literalValue(s.right));
+
+ Map<String, Object> termMap = new HashMap<>();
+ termMap.put("term", map2);
+
+ map.put("not", termMap);
+ }
+ }
+ filters.add(map);
+ }
+ return filters;
+ }
+
+ private boolean stronger(String key, Object v0, Object v1) {
+ if (key.equals("lt") || key.equals("lte")) {
+ if (v0 instanceof Number && v1 instanceof Number) {
+ return ((Number) v0).doubleValue() < ((Number) v1).doubleValue();
+ }
+ if (v0 instanceof String && v1 instanceof String) {
+ return v0.toString().compareTo(v1.toString()) < 0;
+ }
+ }
+ if (key.equals("gt") || key.equals("gte")) {
+ return stronger("lt", v1, v0);
+ }
+ return false;
+ }
+
+ private static Object literalValue(RexLiteral literal) {
+ return literal.getValue2();
+ }
+
+ private Void translateMatch2(RexNode node) {
+ switch (node.getKind()) {
+ case EQUALS:
+ return translateBinary(null, null, (RexCall) node);
+ case LESS_THAN:
+ return translateBinary("lt", "gt", (RexCall) node);
+ case LESS_THAN_OR_EQUAL:
+ return translateBinary("lte", "gte", (RexCall) node);
+ case NOT_EQUALS:
+ return translateBinary("not", "not", (RexCall) node);
+ case GREATER_THAN:
+ return translateBinary("gt", "lt", (RexCall) node);
+ case GREATER_THAN_OR_EQUAL:
+ return translateBinary("gte", "lte", (RexCall) node);
+ default:
+ throw new AssertionError("cannot translate " + node);
+ }
+ }
+
+ /**
+ * Translates a call to a binary operator, reversing arguments if
+ * necessary.
+ * @param op operation
+ * @param rop opposite operation of {@code op}
+ * @param call current relational call
+ * @return result can be ignored
+ */
+ private Void translateBinary(String op, String rop, RexCall call) {
+ final RexNode left = call.operands.get(0);
+ final RexNode right = call.operands.get(1);
+ boolean b = translateBinary2(op, left, right);
+ if (b) {
+ return null;
+ }
+ b = translateBinary2(rop, right, left);
+ if (b) {
+ return null;
+ }
+ throw new AssertionError("cannot translate op " + op + " call " + call);
+ }
+
+ /**
+ * Translates a call to a binary operator. Returns whether successful.
+ * @param op operation
+ * @param left left node of the expression
+ * @param right right node of the expression
+ * @return {@code true} if translation happened, {@code false} otherwise
+ */
+ private boolean translateBinary2(String op, RexNode left, RexNode right) {
+ switch (right.getKind()) {
+ case LITERAL:
+ break;
+ default:
+ return false;
+ }
+ final RexLiteral rightLiteral = (RexLiteral) right;
+ switch (left.getKind()) {
+ case INPUT_REF:
+ final RexInputRef left1 = (RexInputRef) left;
+ String name = fieldNames.get(left1.getIndex());
+ translateOp2(op, name, rightLiteral);
+ return true;
+ case CAST:
+ return translateBinary2(op, ((RexCall) left).operands.get(0), right);
+ case OTHER_FUNCTION:
+ String itemName = ElasticsearchRules.isItem((RexCall) left);
+ if (itemName != null) {
+ translateOp2(op, itemName, rightLiteral);
+ return true;
+ }
+ // fall through
+ default:
+ return false;
+ }
+ }
+
+ private void translateOp2(String op, String name, RexLiteral right) {
+ if (op == null) {
+ eqMap.put(name, right);
+ } else {
+ multimap.put(name, Pair.of(op, right));
+ }
+ }
+ }
+}
+
+// End ElasticsearchFilter.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/0204f286/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
new file mode 100644
index 0000000..72753e6
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.linq4j.tree.Types;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Builtin methods in the Elasticsearch adapter.
+ */
+enum ElasticsearchMethod {
+ ELASTICSEARCH_QUERYABLE_FIND(AbstractElasticsearchTable.ElasticsearchQueryable.class,
+ "find", List.class, List.class);
+
+ public final Method method;
+
+ public static final ImmutableMap<Method, ElasticsearchMethod> MAP;
+
+ static {
+ final ImmutableMap.Builder<Method, ElasticsearchMethod> builder = ImmutableMap.builder();
+ for (ElasticsearchMethod value: ElasticsearchMethod.values()) {
+ builder.put(value.method, value);
+ }
+ MAP = builder.build();
+ }
+
+ ElasticsearchMethod(Class clazz, String methodName, Class... argumentTypes) {
+ this.method = Types.lookupMethod(clazz, methodName, argumentTypes);
+ }
+}
+
+// End ElasticsearchMethod.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/0204f286/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
new file mode 100644
index 0000000..e044703
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Implementation of {@link org.apache.calcite.rel.core.Project}
+ * relational expression in Elasticsearch.
+ */
+public class ElasticsearchProject extends Project implements ElasticsearchRel {
+ ElasticsearchProject(RelOptCluster cluster, RelTraitSet traitSet, RelNode input,
+ List<? extends RexNode> projects, RelDataType rowType) {
+ super(cluster, traitSet, input, projects, rowType);
+ assert getConvention() == ElasticsearchRel.CONVENTION;
+ assert getConvention() == input.getConvention();
+ }
+
+ @Override public Project copy(RelTraitSet relTraitSet, RelNode input, List<RexNode> projects,
+ RelDataType relDataType) {
+ return new ElasticsearchProject(getCluster(), traitSet, input, projects, relDataType);
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+ }
+
+ @Override public void implement(Implementor implementor) {
+ implementor.visitChild(0, getInput());
+
+ final List<String> inFields =
+ ElasticsearchRules.elasticsearchFieldNames(getInput().getRowType());
+ final ElasticsearchRules.RexToElasticsearchTranslator translator =
+ new ElasticsearchRules.RexToElasticsearchTranslator(
+ (JavaTypeFactory) getCluster().getTypeFactory(), inFields);
+
+ final List<String> fields = new ArrayList<>();
+ final List<String> scriptFields = new ArrayList<>();
+ for (Pair<RexNode, String> pair: getNamedProjects()) {
+ final String name = pair.right;
+ final String expr = pair.left.accept(translator);
+
+ if (expr.equals("\"" + name + "\"")) {
+ fields.add(name);
+ } else if (expr.matches("\"literal\":.+")) {
+ scriptFields.add(ElasticsearchRules.quote(name)
+ + ":{\"script\": "
+ + expr.split(":")[1] + "}");
+ } else {
+ scriptFields.add(ElasticsearchRules.quote(name)
+ + ":{\"script\":"
+ // _source (ES2) vs params._source (ES5)
+ + "\"" + implementor.elasticsearchTable.scriptedFieldPrefix() + "."
+ + expr.replaceAll("\"", "") + "\"}");
+ }
+ }
+
+ StringBuilder query = new StringBuilder();
+ if (scriptFields.isEmpty()) {
+ List<String> newList = Lists.transform(fields, ElasticsearchRules::quote);
+
+ final String findString = String.join(", ", newList);
+ query.append("\"_source\" : [").append(findString).append("]");
+ } else {
+ // if scripted fields are present, ES ignores _source attribute
+ for (String field: fields) {
+ scriptFields.add(ElasticsearchRules.quote(field) + ":{\"script\": "
+ // _source (ES2) vs params._source (ES5)
+ + "\"" + implementor.elasticsearchTable.scriptedFieldPrefix() + "."
+ + field + "\"}");
+ }
+ query.append("\"script_fields\": {")
+ .append(String.join(", ", scriptFields))
+ .append("}");
+ }
+
+ for (String opfield : implementor.list) {
+ if (opfield.startsWith("\"_source\"")) {
+ implementor.list.remove(opfield);
+ }
+ }
+ implementor.add(query.toString());
+ }
+}
+
+// End ElasticsearchProject.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/0204f286/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
new file mode 100644
index 0000000..436adf9
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Relational expression that uses Elasticsearch calling convention.
+ */
+public interface ElasticsearchRel extends RelNode {
+ void implement(Implementor implementor);
+
+ /**
+ * Calling convention for relational operations that occur in Elasticsearch.
+ */
+ Convention CONVENTION = new Convention.Impl("ELASTICSEARCH", ElasticsearchRel.class);
+
+ /**
+ * Callback for the implementation process that converts a tree of
+ * {@link ElasticsearchRel} nodes into an Elasticsearch query.
+ */
+ class Implementor {
+ final List<String> list = new ArrayList<>();
+
+ RelOptTable table;
+ AbstractElasticsearchTable elasticsearchTable;
+
+ public void add(String findOp) {
+ list.add(findOp);
+ }
+
+ public void visitChild(int ordinal, RelNode input) {
+ assert ordinal == 0;
+ ((ElasticsearchRel) input).implement(this);
+ }
+ }
+}
+
+// End ElasticsearchRel.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/0204f286/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
new file mode 100644
index 0000000..81b915b
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.adapter.enumerable.RexImpTable;
+import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Rules and relational operators for
+ * {@link ElasticsearchRel#CONVENTION ELASTICSEARCH}
+ * calling convention.
+ */
+class ElasticsearchRules {
+ static final RelOptRule[] RULES = {
+ ElasticsearchSortRule.INSTANCE,
+ ElasticsearchFilterRule.INSTANCE,
+ ElasticsearchProjectRule.INSTANCE
+ };
+
+ private ElasticsearchRules() {}
+
+ /**
+ * Returns 'string' if it is a call to item['string'], null otherwise.
+ * @param call current relational expression
+ * @return literal value
+ */
+ static String isItem(RexCall call) {
+ if (call.getOperator() != SqlStdOperatorTable.ITEM) {
+ return null;
+ }
+ final RexNode op0 = call.getOperands().get(0);
+ final RexNode op1 = call.getOperands().get(1);
+
+ if (op0 instanceof RexInputRef
+ && ((RexInputRef) op0).getIndex() == 0
+ && op1 instanceof RexLiteral
+ && ((RexLiteral) op1).getValue2() instanceof String) {
+ return (String) ((RexLiteral) op1).getValue2();
+ }
+ return null;
+ }
+
+ static List<String> elasticsearchFieldNames(final RelDataType rowType) {
+ return SqlValidatorUtil.uniquify(
+ new AbstractList<String>() {
+ @Override public String get(int index) {
+ final String name = rowType.getFieldList().get(index).getName();
+ return name.startsWith("$") ? "_" + name.substring(2) : name;
+ }
+
+ @Override public int size() {
+ return rowType.getFieldCount();
+ }
+ },
+ SqlValidatorUtil.EXPR_SUGGESTER, true);
+ }
+
+ static String quote(String s) {
+ return "\"" + s + "\"";
+ }
+
+ /**
+ * Translator from {@link RexNode} to strings in Elasticsearch's expression
+ * language.
+ */
+ static class RexToElasticsearchTranslator extends RexVisitorImpl<String> {
+ private final JavaTypeFactory typeFactory;
+ private final List<String> inFields;
+
+ RexToElasticsearchTranslator(JavaTypeFactory typeFactory, List<String> inFields) {
+ super(true);
+ this.typeFactory = typeFactory;
+ this.inFields = inFields;
+ }
+
+ @Override public String visitLiteral(RexLiteral literal) {
+ if (literal.getValue() == null) {
+ return "null";
+ }
+ return "\"literal\":\""
+ + RexToLixTranslator.translateLiteral(literal, literal.getType(),
+ typeFactory, RexImpTable.NullAs.NOT_POSSIBLE)
+ + "\"";
+ }
+
+ @Override public String visitInputRef(RexInputRef inputRef) {
+ return quote(inFields.get(inputRef.getIndex()));
+ }
+
+ @Override public String visitCall(RexCall call) {
+ final String name = isItem(call);
+ if (name != null) {
+ return "\"" + name + "\"";
+ }
+
+ final List<String> strings = visitList(call.operands);
+ if (call.getKind() == SqlKind.CAST) {
+ return strings.get(0).startsWith("$") ? strings.get(0).substring(1) : strings.get(0);
+ }
+ if (call.getOperator() == SqlStdOperatorTable.ITEM) {
+ final RexNode op1 = call.getOperands().get(1);
+ if (op1 instanceof RexLiteral && op1.getType().getSqlTypeName() == SqlTypeName.INTEGER) {
+ return stripQuotes(strings.get(0)) + "[" + ((RexLiteral) op1).getValue2() + "]";
+ }
+ }
+ throw new IllegalArgumentException("Translation of " + call.toString()
+ + "is not supported by ElasticsearchProject");
+ }
+
+ private String stripQuotes(String s) {
+ return s.startsWith("'") && s.endsWith("'") ? s.substring(1, s.length() - 1) : s;
+ }
+
+ List<String> visitList(List<RexNode> list) {
+ final List<String> strings = new ArrayList<>();
+ for (RexNode node: list) {
+ strings.add(node.accept(this));
+ }
+ return strings;
+ }
+ }
+
+ /**
+ * Base class for planner rules that convert a relational expression to
+ * Elasticsearch calling convention.
+ */
+ abstract static class ElasticsearchConverterRule extends ConverterRule {
+ final Convention out;
+
+ ElasticsearchConverterRule(Class<? extends RelNode> clazz, RelTrait in, Convention out,
+ String description) {
+ super(clazz, in, out, description);
+ this.out = out;
+ }
+ }
+
+ /**
+ * Rule to convert a {@link org.apache.calcite.rel.core.Sort} to an
+ * {@link ElasticsearchSort}.
+ */
+ private static class ElasticsearchSortRule extends ElasticsearchConverterRule {
+ private static final ElasticsearchSortRule INSTANCE =
+ new ElasticsearchSortRule();
+
+ private ElasticsearchSortRule() {
+ super(Sort.class, Convention.NONE, ElasticsearchRel.CONVENTION,
+ "ElasticsearchSortRule");
+ }
+
+ @Override public RelNode convert(RelNode relNode) {
+ final Sort sort = (Sort) relNode;
+ final RelTraitSet traitSet = sort.getTraitSet().replace(out).replace(sort.getCollation());
+ return new ElasticsearchSort(relNode.getCluster(), traitSet,
+ convert(sort.getInput(), traitSet.replace(RelCollations.EMPTY)), sort.getCollation(),
+ sort.offset, sort.fetch);
+ }
+ }
+
+ /**
+ * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalFilter} to an
+ * {@link ElasticsearchFilter}.
+ */
+ private static class ElasticsearchFilterRule extends ElasticsearchConverterRule {
+ private static final ElasticsearchFilterRule INSTANCE = new ElasticsearchFilterRule();
+
+ private ElasticsearchFilterRule() {
+ super(LogicalFilter.class, Convention.NONE, ElasticsearchRel.CONVENTION,
+ "ElasticsearchFilterRule");
+ }
+
+ @Override public RelNode convert(RelNode relNode) {
+ final LogicalFilter filter = (LogicalFilter) relNode;
+ final RelTraitSet traitSet = filter.getTraitSet().replace(out);
+ return new ElasticsearchFilter(relNode.getCluster(), traitSet,
+ convert(filter.getInput(), out),
+ filter.getCondition());
+ }
+ }
+
+ /**
+ * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalProject}
+ * to an {@link ElasticsearchProject}.
+ */
+ private static class ElasticsearchProjectRule extends ElasticsearchConverterRule {
+ private static final ElasticsearchProjectRule INSTANCE = new ElasticsearchProjectRule();
+
+ private ElasticsearchProjectRule() {
+ super(LogicalProject.class, Convention.NONE, ElasticsearchRel.CONVENTION,
+ "ElasticsearchProjectRule");
+ }
+
+ @Override public RelNode convert(RelNode relNode) {
+ final LogicalProject project = (LogicalProject) relNode;
+ final RelTraitSet traitSet = project.getTraitSet().replace(out);
+ return new ElasticsearchProject(project.getCluster(), traitSet,
+ convert(project.getInput(), out), project.getProjects(), project.getRowType());
+ }
+ }
+}
+
+// End ElasticsearchRules.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/0204f286/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
new file mode 100644
index 0000000..a446615
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Schema mapped onto an index of ELASTICSEARCH types.
+ *
+ * <p>Each table in the schema is an ELASTICSEARCH type in that index.
+ */
+public class ElasticsearchSchema extends AbstractSchema {
+
+ private final String index;
+
+ private final RestClient client;
+
+ private final ObjectMapper mapper;
+
+ /**
+ * Allows schema to be instantiated from existing elastic search client.
+ * This constructor is used in tests.
+ * @param client existing client instance
+ * @param mapper mapper for JSON (de)serialization
+ * @param index name of ES index
+ */
+ ElasticsearchSchema(RestClient client, ObjectMapper mapper, String index) {
+ super();
+ this.client = Objects.requireNonNull(client, "client");
+ this.mapper = Objects.requireNonNull(mapper, "mapper");
+ this.index = Objects.requireNonNull(index, "index");
+ }
+
+ @Override protected Map<String, Table> getTableMap() {
+ final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
+ try {
+ for (String type: listTypes()) {
+ builder.put(type, new ElasticsearchTable(client, index, type));
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to get types for " + index, e);
+ }
+ return builder.build();
+ }
+
+ /**
+ * Queries {@code _mapping} definition to automatically detect all types for an index
+ *
+ * @return list of types associated with this index
+ * @throws IOException for any IO related issues
+ * @throws IllegalStateException if reply is not understood
+ */
+ private Set<String> listTypes() throws IOException {
+ final String endpoint = index + "/_mapping";
+ final Response response = client.performRequest("GET", endpoint);
+ try (InputStream is = response.getEntity().getContent()) {
+ JsonNode root = mapper.readTree(is);
+ if (!root.isObject() || root.size() != 1) {
+ final String message = String.format(Locale.ROOT, "Invalid response for %s/%s "
+ + "Expected object of size 1 got %s (of size %d)", response.getHost(),
+ response.getRequestLine(), root.getNodeType(), root.size());
+ throw new IllegalStateException(message);
+ }
+
+ JsonNode mappings = root.iterator().next().get("mappings");
+ if (mappings == null || mappings.size() == 0) {
+ final String message = String.format(Locale.ROOT, "Index %s does not have any types",
+ index);
+ throw new IllegalStateException(message);
+ }
+
+ Set<String> types = Sets.newHashSet(mappings.fieldNames());
+ types.remove("_default_");
+ return types;
+ }
+ }
+
+ public String getIndex() {
+ return index;
+ }
+}
+
+// End ElasticsearchSchema.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/0204f286/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java
new file mode 100644
index 0000000..5b93a51
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaFactory;
+import org.apache.calcite.schema.SchemaPlus;
+
+import org.apache.http.HttpHost;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import com.google.common.base.Preconditions;
+
+import org.elasticsearch.client.RestClient;
+
+import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Factory that creates an {@link ElasticsearchSchema}.
+ *
+ * <p>Allows a custom schema to be included in a model.json file.
+ */
+@SuppressWarnings("UnusedDeclaration")
+public class ElasticsearchSchemaFactory implements SchemaFactory {
+
+ public ElasticsearchSchemaFactory() {
+ }
+
+ @Override public Schema create(SchemaPlus parentSchema, String name,
+ Map<String, Object> operand) {
+
+ final Map map = (Map) operand;
+
+ final ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
+
+ try {
+ final Map<String, Integer> coordinates =
+ mapper.readValue((String) map.get("coordinates"),
+ new TypeReference<Map<String, Integer>>() { });
+
+ final RestClient client = connect(coordinates);
+
+ final Map<String, String> userConfig =
+ mapper.readValue((String) map.get("userConfig"),
+ new TypeReference<Map<String, String>>() { });
+
+ final String index = (String) map.get("index");
+ Preconditions.checkArgument(index != null, "index is missing in configuration");
+ return new ElasticsearchSchema(client, new ObjectMapper(), index);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot parse values from json", e);
+ }
+ }
+
+ /**
+ * Builds elastic rest client from user configuration
+ * @param coordinates list of {@code hostname/port} to connect to
+ * @return newly initialized low-level rest http client for ES
+ */
+ private static RestClient connect(Map<String, Integer> coordinates) {
+ Objects.requireNonNull(coordinates, "coordinates");
+ Preconditions.checkArgument(!coordinates.isEmpty(), "no ES coordinates specified");
+ final Set<HttpHost> set = new LinkedHashSet<>();
+ for (Map.Entry<String, Integer> entry: coordinates.entrySet()) {
+ set.add(new HttpHost(entry.getKey(), entry.getValue()));
+ }
+
+ return RestClient.builder(set.toArray(new HttpHost[0])).build();
+ }
+
+}
+
+// End ElasticsearchSchemaFactory.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/0204f286/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSearchResult.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSearchResult.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSearchResult.java
new file mode 100644
index 0000000..9bd19d8
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSearchResult.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Internal object used to parse elastic search result. Similar to {@code SearchHit}.
+ * Since we're using row-level rest client the response has to be processed manually.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ElasticsearchSearchResult {
+
+ private final SearchHits hits;
+ private final long took;
+
+ /**
+ * Constructor for this instance.
+ * @param hits list of matched documents
+ * @param took time taken (in took) for this query to execute
+ */
+ @JsonCreator
+ ElasticsearchSearchResult(@JsonProperty("hits") SearchHits hits,
+ @JsonProperty("took") long took) {
+ this.hits = Objects.requireNonNull(hits, "hits");
+ this.took = took;
+ }
+
+ public SearchHits searchHits() {
+ return hits;
+ }
+
+ public Duration took() {
+ return Duration.ofMillis(took);
+ }
+
+ /**
+ * Similar to {@code SearchHits} in ES. Container for {@link SearchHit}
+ */
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class SearchHits {
+
+ private final long total;
+ private final List<SearchHit> hits;
+
+ @JsonCreator
+ SearchHits(@JsonProperty("total")final long total,
+ @JsonProperty("hits") final List<SearchHit> hits) {
+ this.total = total;
+ this.hits = Objects.requireNonNull(hits, "hits");
+ }
+
+ public List<SearchHit> hits() {
+ return this.hits;
+ }
+
+ public long total() {
+ return total;
+ }
+
+ }
+
+ /**
+ * Concrete result record which matched the query. Similar to {@code SearchHit} in ES.
+ */
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class SearchHit {
+ private final String id;
+ private final Map<String, Object> source;
+ private final Map<String, Object> fields;
+
+ @JsonCreator
+ private SearchHit(@JsonProperty("_id") final String id,
+ @JsonProperty("_source") final Map<String, Object> source,
+ @JsonProperty("fields") final Map<String, Object> fields) {
+ this.id = Objects.requireNonNull(id, "id");
+
+ // both can't be null
+ if (source == null && fields == null) {
+ final String message = String.format(Locale.ROOT,
+ "Both '_source' and 'fields' are missing for %s", id);
+ throw new IllegalArgumentException(message);
+ }
+
+ // both can't be non-null
+ if (source != null && fields != null) {
+ final String message = String.format(Locale.ROOT,
+ "Both '_source' and 'fields' are populated (non-null) for %s", id);
+ throw new IllegalArgumentException(message);
+ }
+
+ this.source = source;
+ this.fields = fields;
+ }
+
+ /**
+ * Returns id of this hit (usually document id)
+ * @return unique id
+ */
+ public String id() {
+ return id;
+ }
+
+ /**
+ * Finds specific attribute from ES search result
+ * @param name attribute name
+ * @return value from result (_source or fields)
+ */
+ Object value(String name) {
+ Objects.requireNonNull(name, "name");
+
+ if (!sourceOrFields().containsKey(name)) {
+ final String message = String.format(Locale.ROOT,
+ "Attribute %s not found in search result %s", name, id);
+ throw new IllegalArgumentException(message);
+ }
+
+ if (source != null) {
+ return source.get(name);
+ } else if (fields != null) {
+ Object field = fields.get(name);
+ if (field instanceof Iterable) {
+ // return first element (or null)
+ Iterator<?> iter = ((Iterable<?>) field).iterator();
+ return iter.hasNext() ? iter.next() : null;
+ }
+
+ return field;
+ }
+
+ throw new AssertionError("Shouldn't get here: " + id);
+
+ }
+
+ public Map<String, Object> source() {
+ return source;
+ }
+
+ public Map<String, Object> fields() {
+ return fields;
+ }
+
+ public Map<String, Object> sourceOrFields() {
+ return source != null ? source : fields;
+ }
+ }
+
+}
+
+// End ElasticsearchSearchResult.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/0204f286/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
new file mode 100644
index 0000000..ed669aa
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Implementation of {@link org.apache.calcite.rel.core.Sort}
+ * relational expression in Elasticsearch.
+ */
+public class ElasticsearchSort extends Sort implements ElasticsearchRel {
+ ElasticsearchSort(RelOptCluster cluster, RelTraitSet traitSet, RelNode child,
+ RelCollation collation, RexNode offset, RexNode fetch) {
+ super(cluster, traitSet, child, collation, offset, fetch);
+ assert getConvention() == ElasticsearchRel.CONVENTION;
+ assert getConvention() == child.getConvention();
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.05);
+ }
+
+ @Override public Sort copy(RelTraitSet traitSet, RelNode relNode, RelCollation relCollation,
+ RexNode offset, RexNode fetch) {
+ return new ElasticsearchSort(getCluster(), traitSet, relNode, collation, offset, fetch);
+ }
+
+ @Override public void implement(Implementor implementor) {
+ implementor.visitChild(0, getInput());
+ if (!collation.getFieldCollations().isEmpty()) {
+ final List<String> keys = new ArrayList<>();
+ if (input instanceof Project) {
+ final List<RexNode> projects = ((Project) input).getProjects();
+
+ for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
+ RexNode project = projects.get(fieldCollation.getFieldIndex());
+ String name = project.accept(MapProjectionFieldVisitor.INSTANCE);
+ keys.add(ElasticsearchRules.quote(name) + ": " + direction(fieldCollation));
+ }
+ } else {
+ final List<RelDataTypeField> fields = getRowType().getFieldList();
+
+ for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
+ final String name = fields.get(fieldCollation.getFieldIndex()).getName();
+ keys.add(ElasticsearchRules.quote(name) + ": " + direction(fieldCollation));
+ }
+ }
+
+ implementor.add("\"sort\": [ " + Util.toString(keys, "{", "}, {", "}") + "]");
+ }
+
+ if (offset != null) {
+ implementor.add("\"from\": " + ((RexLiteral) offset).getValue());
+ }
+
+ if (fetch != null) {
+ implementor.add("\"size\": " + ((RexLiteral) fetch).getValue());
+ }
+ }
+
+ private String direction(RelFieldCollation fieldCollation) {
+ switch (fieldCollation.getDirection()) {
+ case DESCENDING:
+ case STRICTLY_DESCENDING:
+ return "\"desc\"";
+ case ASCENDING:
+ case STRICTLY_ASCENDING:
+ default:
+ return "\"asc\"";
+ }
+ }
+}
+
+// End ElasticsearchSort.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/0204f286/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
new file mode 100644
index 0000000..7667563
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.util.Util;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpStatus;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.util.EntityUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Table based on an Elasticsearch type.
+ */
+public class ElasticsearchTable extends AbstractElasticsearchTable {
+ private final RestClient restClient;
+ private final ElasticsearchVersion version;
+ private final ObjectMapper mapper;
+
+ /**
+ * Creates an ElasticsearchTable.
+ * @param client low-level ES rest client
+ * @param indexName elastic search index
+ * @param typeName elastic searh index type
+ */
+ ElasticsearchTable(RestClient client, String indexName, String typeName) {
+ super(indexName, typeName);
+ this.restClient = Objects.requireNonNull(client, "client");
+ this.mapper = new ObjectMapper();
+ try {
+ this.version = detectVersion(client, mapper);
+ } catch (IOException e) {
+ final String message = String.format(Locale.ROOT, "Couldn't detect ES version "
+ + "for %s/%s", indexName, typeName);
+ throw new UncheckedIOException(message, e);
+ }
+
+ }
+
+ /**
+ * Detects current Elastic Search version by connecting to a existing instance.
+ * It is a {@code GET} request to {@code /}. Returned JSON has server information
+ * (including version).
+ *
+ * @param client low-level rest client connected to ES instance
+ * @param mapper Jackson mapper instance used to parse responses
+ * @return parsed version from ES, or {@link ElasticsearchVersion#UNKNOWN}
+ * @throws IOException if couldn't connect to ES
+ */
+ private static ElasticsearchVersion detectVersion(RestClient client, ObjectMapper mapper)
+ throws IOException {
+ HttpEntity entity = client.performRequest("GET", "/").getEntity();
+ JsonNode node = mapper.readTree(EntityUtils.toString(entity));
+ return ElasticsearchVersion.fromString(node.get("version").get("number").asText());
+ }
+
+ @Override protected String scriptedFieldPrefix() {
+ // ES2 vs ES5 scripted field difference
+ return version == ElasticsearchVersion.ES2 ? "_source" : "params._source";
+ }
+
+ @Override protected Enumerable<Object> find(String index, List<String> ops,
+ List<Map.Entry<String, Class>> fields) {
+
+ final String query;
+ if (!ops.isEmpty()) {
+ query = "{" + Util.toString(ops, "", ", ", "") + "}";
+ } else {
+ query = "{}";
+ }
+
+ try {
+ ElasticsearchSearchResult result = httpRequest(query);
+ final Function1<ElasticsearchSearchResult.SearchHit, Object> getter =
+ ElasticsearchEnumerators.getter(fields);
+ return Linq4j.asEnumerable(result.searchHits().hits()).select(getter);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private ElasticsearchSearchResult httpRequest(String query) throws IOException {
+ Objects.requireNonNull(query, "query");
+ String uri = String.format(Locale.ROOT, "/%s/%s/_search", indexName, typeName);
+ HttpEntity entity = new StringEntity(query, ContentType.APPLICATION_JSON);
+ Response response = restClient.performRequest("POST", uri, Collections.emptyMap(), entity);
+
+ if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+ final String error = EntityUtils.toString(response.getEntity());
+ final String message = String.format(Locale.ROOT,
+ "Error while querying Elastic (on %s/%s) status: %s\nQuery:\n%s\nError:\n%s\n",
+ response.getHost(), response.getRequestLine(), response.getStatusLine(), query, error);
+ throw new RuntimeException(message);
+ }
+
+ try (InputStream is = response.getEntity().getContent()) {
+ return mapper.readValue(is, ElasticsearchSearchResult.class);
+ }
+ }
+}
+
+// End ElasticsearchTable.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/0204f286/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
new file mode 100644
index 0000000..7795ad3
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Relational expression representing a scan of an Elasticsearch type.
+ *
+ * <p> Additional operations might be applied,
+ * using the "find" method.</p>
+ */
+public class ElasticsearchTableScan extends TableScan implements ElasticsearchRel {
+ private final AbstractElasticsearchTable elasticsearchTable;
+ private final RelDataType projectRowType;
+
+ /**
+ * Creates an ElasticsearchTableScan.
+ *
+ * @param cluster Cluster
+ * @param traitSet Trait set
+ * @param table Table
+ * @param elasticsearchTable Elasticsearch table
+ * @param projectRowType Fields and types to project; null to project raw row
+ */
+ ElasticsearchTableScan(RelOptCluster cluster, RelTraitSet traitSet,
+ RelOptTable table, AbstractElasticsearchTable elasticsearchTable,
+ RelDataType projectRowType) {
+ super(cluster, traitSet, table);
+ this.elasticsearchTable = Objects.requireNonNull(elasticsearchTable);
+ this.projectRowType = projectRowType;
+
+ assert getConvention() == ElasticsearchRel.CONVENTION;
+ }
+
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ assert inputs.isEmpty();
+ return this;
+ }
+
+ @Override public RelDataType deriveRowType() {
+ return projectRowType != null ? projectRowType : super.deriveRowType();
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ final float f = projectRowType == null ? 1f : (float) projectRowType.getFieldCount() / 100f;
+ return super.computeSelfCost(planner, mq).multiplyBy(.1 * f);
+ }
+
+ @Override public void register(RelOptPlanner planner) {
+ planner.addRule(ElasticsearchToEnumerableConverterRule.INSTANCE);
+ for (RelOptRule rule: ElasticsearchRules.RULES) {
+ planner.addRule(rule);
+ }
+ }
+
+ @Override public void implement(Implementor implementor) {
+ implementor.elasticsearchTable = elasticsearchTable;
+ implementor.table = table;
+ }
+}
+
+// End ElasticsearchTableScan.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/0204f286/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
new file mode 100644
index 0000000..d2896d3
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
+import org.apache.calcite.adapter.enumerable.JavaRowFormat;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.MethodCallExpression;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.CalcitePrepareImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterImpl;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.util.Pair;
+
+import com.google.common.collect.Lists;
+
+import java.util.AbstractList;
+import java.util.List;
+
+/**
+ * Relational expression representing a scan of a table in an Elasticsearch data source.
+ */
+public class ElasticsearchToEnumerableConverter extends ConverterImpl implements EnumerableRel {
+ ElasticsearchToEnumerableConverter(RelOptCluster cluster, RelTraitSet traits,
+ RelNode input) {
+ super(cluster, ConventionTraitDef.INSTANCE, traits, input);
+ }
+
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new ElasticsearchToEnumerableConverter(getCluster(), traitSet, sole(inputs));
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(.1);
+ }
+
+ @Override public Result implement(EnumerableRelImplementor implementor, Prefer prefer) {
+ final BlockBuilder list = new BlockBuilder();
+ final ElasticsearchRel.Implementor elasticsearchImplementor =
+ new ElasticsearchRel.Implementor();
+ elasticsearchImplementor.visitChild(0, getInput());
+ final RelDataType rowType = getRowType();
+ final PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), rowType,
+ prefer.prefer(JavaRowFormat.ARRAY));
+ final Expression fields = list.append("fields",
+ constantArrayList(
+ Pair.zip(ElasticsearchRules.elasticsearchFieldNames(rowType),
+ new AbstractList<Class>() {
+ @Override public Class get(int index) {
+ return physType.fieldClass(index);
+ }
+
+ @Override public int size() {
+ return rowType.getFieldCount();
+ }
+ }),
+ Pair.class));
+ final Expression table = list.append("table",
+ elasticsearchImplementor.table
+ .getExpression(AbstractElasticsearchTable.ElasticsearchQueryable.class));
+ List<String> opList = elasticsearchImplementor.list;
+ final Expression ops = list.append("ops", constantArrayList(opList, String.class));
+ Expression enumerable = list.append("enumerable",
+ Expressions.call(table, ElasticsearchMethod.ELASTICSEARCH_QUERYABLE_FIND.method, ops,
+ fields));
+ if (CalcitePrepareImpl.DEBUG) {
+ System.out.println("Elasticsearch: " + opList);
+ }
+ Hook.QUERY_PLAN.run(opList);
+ list.add(Expressions.return_(null, enumerable));
+ return implementor.result(physType, list.toBlock());
+ }
+
+ /** E.g. {@code constantArrayList("x", "y")} returns
+ * "Arrays.asList('x', 'y')".
+ * @param values list of values
+ * @param clazz runtime class representing each element in the list
+ * @param <T> type of elements in the list
+ * @return method call which creates a list
+ */
+ private static <T> MethodCallExpression constantArrayList(List<T> values, Class clazz) {
+ return Expressions.call(BuiltInMethod.ARRAYS_AS_LIST.method,
+ Expressions.newArrayInit(clazz, constantList(values)));
+ }
+
+ /** E.g. {@code constantList("x", "y")} returns
+ * {@code {ConstantExpression("x"), ConstantExpression("y")}}.
+ * @param values list of elements
+ * @param <T> type of elements inside this list
+ * @return list of constant expressions
+ */
+ private static <T> List<Expression> constantList(List<T> values) {
+ return Lists.transform(values, Expressions::constant);
+ }
+}
+
+// End ElasticsearchToEnumerableConverter.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/0204f286/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverterRule.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverterRule.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverterRule.java
new file mode 100644
index 0000000..af7bbd6
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverterRule.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+import java.util.function.Predicate;
+
+/**
+ * Rule to convert a relational expression from
+ * {@link ElasticsearchRel#CONVENTION} to {@link EnumerableConvention}.
+ */
+public class ElasticsearchToEnumerableConverterRule extends ConverterRule {
+ static final ConverterRule INSTANCE =
+ new ElasticsearchToEnumerableConverterRule(RelFactories.LOGICAL_BUILDER);
+
+ /**
+ * Creates an ElasticsearchToEnumerableConverterRule.
+ *
+ * @param relBuilderFactory Builder for relational expressions
+ */
+ ElasticsearchToEnumerableConverterRule(
+ RelBuilderFactory relBuilderFactory) {
+ super(RelNode.class, (Predicate<RelNode>) r -> true,
+ ElasticsearchRel.CONVENTION, EnumerableConvention.INSTANCE,
+ relBuilderFactory, "ElasticsearchToEnumerableConverterRule");
+ }
+
+ @Override public RelNode convert(RelNode relNode) {
+ RelTraitSet newTraitSet = relNode.getTraitSet().replace(getOutConvention());
+ return new ElasticsearchToEnumerableConverter(relNode.getCluster(), newTraitSet, relNode);
+ }
+}
+
+// End ElasticsearchToEnumerableConverterRule.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/0204f286/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchVersion.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchVersion.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchVersion.java
new file mode 100644
index 0000000..3d774dd
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchVersion.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import java.util.Locale;
+import java.util.Objects;
+
+/**
+ * Identifies current ES version at runtime. Some queries have different syntax
+ * depending on version (eg. 2 vs 5).
+ */
+enum ElasticsearchVersion {
+
+ ES2,
+ ES5,
+ ES6,
+ ES7,
+ UNKNOWN;
+
+ static ElasticsearchVersion fromString(String version) {
+ Objects.requireNonNull(version, "version");
+ if (!version.matches("\\d+\\.\\d+\\.\\d+")) {
+ final String message = String.format(Locale.ROOT, "Wrong version format. "
+ + "Expected ${digit}.${digit}.${digit} but got %s", version);
+ throw new IllegalArgumentException(message);
+ }
+
+ // version format is: major.minor.revision
+ final int major = Integer.parseInt(version.substring(0, version.indexOf(".")));
+ if (major == 2) {
+ return ES2;
+ } else if (major == 5) {
+ return ES5;
+ } else if (major == 6) {
+ return ES6;
+ } else if (major == 7) {
+ return ES7;
+ } else {
+ return UNKNOWN;
+ }
+ }
+}
+
+// End ElasticsearchVersion.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/0204f286/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/MapProjectionFieldVisitor.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/MapProjectionFieldVisitor.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/MapProjectionFieldVisitor.java
new file mode 100644
index 0000000..0e5b556
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/MapProjectionFieldVisitor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+
+/**
+ * Visitor that extracts the actual field name from an item expression.
+ */
+class MapProjectionFieldVisitor extends RexVisitorImpl<String> {
+
+ static final MapProjectionFieldVisitor INSTANCE = new MapProjectionFieldVisitor();
+
+ private MapProjectionFieldVisitor() {
+ super(true);
+ }
+
+ @Override public String visitCall(RexCall call) {
+ if (call.op == SqlStdOperatorTable.ITEM) {
+ return ((RexLiteral) call.getOperands().get(1)).getValueAs(String.class);
+ }
+ return super.visitCall(call);
+ }
+}
+
+// End MapProjectionFieldVisitor.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/0204f286/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/package-info.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/package-info.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/package-info.java
new file mode 100644
index 0000000..ee49d60
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Query provider based on an Elasticsearch2 DB.
+ */
+@PackageMarker
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.avatica.util.PackageMarker;
+
+// End package-info.java