You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by el...@apache.org on 2016/05/31 23:24:07 UTC
[09/14] calcite git commit: [CALCITE-1253] Elasticsearch adapter
(Subhobrata Dey)
[CALCITE-1253] Elasticsearch adapter (Subhobrata Dey)
Close apache/calcite#236
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/f3caf13b
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/f3caf13b
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/f3caf13b
Branch: refs/heads/branch-avatica-1.8
Commit: f3caf13b9f1cd92f95dcf27716466bf2133e1ed7
Parents: b76affc
Author: Subhobrata Dey <sb...@gmail.com>
Authored: Sat May 21 15:33:32 2016 -0400
Committer: Julian Hyde <jh...@apache.org>
Committed: Thu May 26 14:57:06 2016 -0700
----------------------------------------------------------------------
elasticsearch/pom.xml | 142 ++++++++++
.../elasticsearch/ElasticsearchEnumerator.java | 151 ++++++++++
.../elasticsearch/ElasticsearchFilter.java | 284 +++++++++++++++++++
.../elasticsearch/ElasticsearchMethod.java | 50 ++++
.../elasticsearch/ElasticsearchProject.java | 95 +++++++
.../adapter/elasticsearch/ElasticsearchRel.java | 58 ++++
.../elasticsearch/ElasticsearchRules.java | 240 ++++++++++++++++
.../elasticsearch/ElasticsearchSchema.java | 125 ++++++++
.../ElasticsearchSchemaFactory.java | 63 ++++
.../elasticsearch/ElasticsearchSort.java | 93 ++++++
.../elasticsearch/ElasticsearchTable.java | 150 ++++++++++
.../elasticsearch/ElasticsearchTableScan.java | 88 ++++++
.../ElasticsearchToEnumerableConverter.java | 124 ++++++++
.../ElasticsearchToEnumerableConverterRule.java | 42 +++
.../adapter/elasticsearch/package-info.java | 26 ++
.../calcite/test/ElasticsearchAdapterIT.java | 270 ++++++++++++++++++
.../resources/elasticsearch-zips-model.json | 50 ++++
.../src/test/resources/log4j.properties | 24 ++
pom.xml | 2 +
site/_docs/adapter.md | 1 +
site/_docs/elasticsearch_adapter.md | 136 +++++++++
sqlline | 2 +-
sqlline.bat | 2 +-
23 files changed, 2216 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml
new file mode 100644
index 0000000..fc6df83
--- /dev/null
+++ b/elasticsearch/pom.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite</artifactId>
+ <version>1.8.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>calcite-elasticsearch</artifactId>
+ <packaging>jar</packaging>
+ <version>1.8.0-SNAPSHOT</version>
+ <name>Calcite Elasticsearch</name>
+ <description>Elasticsearch adapter for Calcite</description>
+
+ <properties>
+ <top.dir>${project.basedir}/..</top.dir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.calcite.avatica</groupId>
+ <artifactId>avatica</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ <type>jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-linq4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${elasticsearch-java-driver.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.carrotsearch</groupId>
+ <artifactId>hppc</artifactId>
+ <version>0.7.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>${maven-dependency-plugin.version}</version>
+ <executions>
+ <execution>
+ <id>analyze</id>
+ <goals>
+ <goal>analyze-only</goal>
+ </goals>
+ <configuration>
+ <failOnWarning>true</failOnWarning>
+ <!-- ignore "unused but declared" warnings -->
+ <ignoredUnusedDeclaredDependencies>
+ <ignoredUnusedDeclaredDependency>org.apache.calcite.avatica:avatica</ignoredUnusedDeclaredDependency>
+ <ignoredUnusedDeclaredDependency>org.slf4j:slf4j-api</ignoredUnusedDeclaredDependency>
+ <ignoredUnusedDeclaredDependency>org.slf4j:slf4j-log4j12</ignoredUnusedDeclaredDependency>
+ </ignoredUnusedDeclaredDependencies>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-release-plugin</artifactId>
+ </plugin>
+ <!-- Parent module has the same plugin and does the work of
+ generating -sources.jar for each project. But without the
+ plugin declared here, IDEs don't know the sources are
+ available. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>jar-no-fork</goal>
+ <goal>test-jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerator.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerator.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerator.java
new file mode 100644
index 0000000..e7478f5
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.linq4j.tree.Primitive;
+
+import org.elasticsearch.search.SearchHit;
+
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Enumerator that reads from an Elasticsearch type.
+ */
+public class ElasticsearchEnumerator implements Enumerator<Object> {
+ private final Iterator<SearchHit> cursor;
+ private final Function1<SearchHit, Object> getter;
+ private Object current;
+
+ /**
+ * Creates an ElasticsearchEnumerator.
+ *
+ * @param cursor Iterator over Elasticsearch {@link SearchHit} objects
+ * @param getter Converts an object into a list of fields
+ */
+ public ElasticsearchEnumerator(Iterator<SearchHit> cursor, Function1<SearchHit, Object> getter) {
+ this.cursor = cursor;
+ this.getter = getter;
+ }
+
+ public Object current() {
+ return current;
+ }
+
+ public boolean moveNext() {
+ if (cursor.hasNext()) {
+ SearchHit map = cursor.next();
+ current = getter.apply(map);
+ return true;
+ } else {
+ current = null;
+ return false;
+ }
+ }
+
+ public void reset() {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close() {
+ // nothing to do
+ }
+
+ private static Function1<SearchHit, Map> mapGetter() {
+ return new Function1<SearchHit, Map>() {
+ public Map apply(SearchHit searchHitFields) {
+ return (Map) searchHitFields.fields();
+ }
+ };
+ }
+
+ private static Function1<SearchHit, Object> singletonGetter(final String fieldName,
+ final Class fieldClass) {
+ return new Function1<SearchHit, Object>() {
+ public Object apply(SearchHit searchHitFields) {
+ if (searchHitFields.fields().isEmpty()) {
+ return convert(searchHitFields.getSource(), fieldClass);
+ } else {
+ return convert(searchHitFields.getFields(), fieldClass);
+ }
+ }
+ };
+ }
+
+ /**
+ * Function that extracts a given set of fields from {@link SearchHit}
+ * objects.
+ *
+ * @param fields List of fields to project
+ */
+ private static Function1<SearchHit, Object[]> listGetter(
+ final List<Map.Entry<String, Class>> fields) {
+ return new Function1<SearchHit, Object[]>() {
+ public Object[] apply(SearchHit searchHitFields) {
+ Object[] objects = new Object[fields.size()];
+ for (int i = 0; i < fields.size(); i++) {
+ final Map.Entry<String, Class> field = fields.get(i);
+ final String name = field.getKey();
+ if (searchHitFields.fields().isEmpty()) {
+ objects[i] = convert(searchHitFields.getSource().get(name), field.getValue());
+ } else {
+ objects[i] = convert(searchHitFields.field(name).getValue(), field.getValue());
+ }
+ }
+ return objects;
+ }
+ };
+ }
+
+ static Function1<SearchHit, Object> getter(List<Map.Entry<String, Class>> fields) {
+ //noinspection unchecked
+ return fields == null
+ ? (Function1) mapGetter()
+ : fields.size() == 1
+ ? singletonGetter(fields.get(0).getKey(), fields.get(0).getValue())
+ : (Function1) listGetter(fields);
+ }
+
+ private static Object convert(Object o, Class clazz) {
+ if (o == null) {
+ return null;
+ }
+ Primitive primitive = Primitive.of(clazz);
+ if (primitive != null) {
+ clazz = primitive.boxClass;
+ } else {
+ primitive = Primitive.ofBox(clazz);
+ }
+ if (clazz.isInstance(o)) {
+ return o;
+ }
+ if (o instanceof Date && primitive != null) {
+ o = ((Date) o).getTime() / DateTimeUtils.MILLIS_PER_DAY;
+ }
+ if (o instanceof Number && primitive != null) {
+ return primitive.number((Number) o);
+ }
+ return o;
+ }
+}
+
+// End ElasticsearchEnumerator.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
new file mode 100644
index 0000000..f11a7b5
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.JsonBuilder;
+import org.apache.calcite.util.Pair;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Implementation of a {@link org.apache.calcite.rel.core.Filter}
+ * relational expression in Elasticsearch.
+ */
+public class ElasticsearchFilter extends Filter implements ElasticsearchRel {
+ public ElasticsearchFilter(RelOptCluster cluster, RelTraitSet traitSet, RelNode child,
+ RexNode condition) {
+ super(cluster, traitSet, child, condition);
+ assert getConvention() == ElasticsearchRel.CONVENTION;
+ assert getConvention() == child.getConvention();
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+ }
+
+ @Override public Filter copy(RelTraitSet relTraitSet, RelNode input, RexNode condition) {
+ return new ElasticsearchFilter(getCluster(), relTraitSet, input, condition);
+ }
+
+ @Override public void implement(Implementor implementor) {
+ implementor.visitChild(0, getInput());
+ Translator translator = new Translator(ElasticsearchRules
+ .elasticsearchFieldNames(getRowType()));
+ String match = translator.translateMatch(condition);
+ implementor.add(match);
+ }
+
+ /**
+ * Translates {@link RexNode} expressions into Elasticsearch expression strings.
+ */
+ static class Translator {
+ final JsonBuilder builder = new JsonBuilder();
+ final Multimap<String, Pair<String, RexLiteral>> multimap =
+ HashMultimap.create();
+ final Map<String, RexLiteral> eqMap = new LinkedHashMap<>();
+ private final List<String> fieldNames;
+
+ Translator(List<String> fieldNames) {
+ this.fieldNames = fieldNames;
+ }
+
+ private String translateMatch(RexNode condition) {
+ // filter node
+ final Map<String, Object> filterMap = new LinkedHashMap<>();
+ filterMap.put("filter", translateOr(condition));
+
+ // constant_score node
+ final Map<String, Object> map = builder.map();
+ map.put("constant_score", filterMap);
+
+ return "\"query\" : " + builder.toJsonString(map).replaceAll("\\s+", "").toLowerCase();
+ }
+
+ private Object translateOr(RexNode condition) {
+ final List<Object> list = new ArrayList<>();
+
+ final List<RexNode> orNodes = RelOptUtil.disjunctions(condition);
+ for (RexNode node : orNodes) {
+ List<Map<String, Object>> andNodes = translateAnd(node);
+
+ if (andNodes.size() > 0) {
+ Map<String, Object> andClause = new HashMap<>();
+ andClause.put("must", andNodes);
+
+ // boolean filters
+ LinkedHashMap<String, Object> filterEvaluator = new LinkedHashMap<>();
+ filterEvaluator.put("bool", andClause);
+ list.add(filterEvaluator);
+ } else {
+ list.add(andNodes.get(0));
+ }
+ }
+
+ if (orNodes.size() > 1) {
+ Map<String, Object> map = builder.map();
+ map.put("should", list);
+
+ // boolean filters
+ LinkedHashMap<String, Object> filterEvaluator = new LinkedHashMap<>();
+ filterEvaluator.put("bool", map);
+ return filterEvaluator;
+ } else {
+ return list.get(0);
+ }
+ }
+
+ private void addPredicate(Map<String, Object> map, String op, Object v) {
+ if (map.containsKey(op) && stronger(op, map.get(op), v)) {
+ return;
+ }
+ map.put(op, v);
+ }
+
+ /**
+ * Translates a condition that may be an AND of other conditions. Gathers
+ * together conditions that apply to the same field.
+ */
+ private List<Map<String, Object>> translateAnd(RexNode node0) {
+ eqMap.clear();
+ multimap.clear();
+ for (RexNode node : RelOptUtil.conjunctions(node0)) {
+ translateMatch2(node);
+ }
+ List<Map<String, Object>> filters = new ArrayList<>();
+ for (Map.Entry<String, RexLiteral> entry : eqMap.entrySet()) {
+ multimap.removeAll(entry.getKey());
+
+ Map<String, Object> filter = new HashMap<>();
+ filter.put(entry.getKey(), literalValue(entry.getValue()));
+
+ Map<String, Object> map = new HashMap<>();
+ map.put("term", filter);
+ filters.add(map);
+ }
+ for (Map.Entry<String, Collection<Pair<String, RexLiteral>>> entry
+ : multimap.asMap().entrySet()) {
+ Map<String, Object> map2 = builder.map();
+
+ Map<String, Object> map = new HashMap<>();
+ for (Pair<String, RexLiteral> s : entry.getValue()) {
+ if (!s.left.equals("not")) {
+ addPredicate(map2, s.left, literalValue(s.right));
+
+ Map<String, Object> filter = new HashMap<>();
+ filter.put(entry.getKey(), map2);
+
+ map.put("range", filter);
+ } else {
+ map2.put(entry.getKey(), literalValue(s.right));
+
+ Map<String, Object> termMap = new HashMap<>();
+ termMap.put("term", map2);
+
+ map.put("not", termMap);
+ }
+ }
+ filters.add(map);
+ }
+ return filters;
+ }
+
+ private boolean stronger(String key, Object v0, Object v1) {
+ if (key.equals("lt") || key.equals("lte")) {
+ if (v0 instanceof Number && v1 instanceof Number) {
+ return ((Number) v0).doubleValue() < ((Number) v1).doubleValue();
+ }
+ if (v0 instanceof String && v1 instanceof String) {
+ return v0.toString().compareTo(v1.toString()) < 0;
+ }
+ }
+ if (key.equals("gt") || key.equals("gte")) {
+ return stronger("lt", v1, v0);
+ }
+ return false;
+ }
+
+ private static Object literalValue(RexLiteral literal) {
+ return literal.getValue2();
+ }
+
+ private Void translateMatch2(RexNode node) {
+ switch (node.getKind()) {
+ case EQUALS:
+ return translateBinary(null, null, (RexCall) node);
+ case LESS_THAN:
+ return translateBinary("lt", "gt", (RexCall) node);
+ case LESS_THAN_OR_EQUAL:
+ return translateBinary("lte", "gte", (RexCall) node);
+ case NOT_EQUALS:
+ return translateBinary("not", "not", (RexCall) node);
+ case GREATER_THAN:
+ return translateBinary("gt", "lt", (RexCall) node);
+ case GREATER_THAN_OR_EQUAL:
+ return translateBinary("gte", "lte", (RexCall) node);
+ default:
+ throw new AssertionError("cannot translate " + node);
+ }
+ }
+
+ /**
+ * Translates a call to a binary operator, reversing arguments if
+ * necessary.
+ */
+ private Void translateBinary(String op, String rop, RexCall call) {
+ final RexNode left = call.operands.get(0);
+ final RexNode right = call.operands.get(1);
+ boolean b = translateBinary2(op, left, right);
+ if (b) {
+ return null;
+ }
+ b = translateBinary2(rop, right, left);
+ if (b) {
+ return null;
+ }
+ throw new AssertionError("cannot translate op " + op + " call " + call);
+ }
+
+ /**
+ * Translates a call to a binary operator. Returns whether successful.
+ */
+ private boolean translateBinary2(String op, RexNode left, RexNode right) {
+ switch (right.getKind()) {
+ case LITERAL:
+ break;
+ default:
+ return false;
+ }
+ final RexLiteral rightLiteral = (RexLiteral) right;
+ switch (left.getKind()) {
+ case INPUT_REF:
+ final RexInputRef left1 = (RexInputRef) left;
+ String name = fieldNames.get(left1.getIndex());
+ translateOp2(op, name, rightLiteral);
+ return true;
+ case CAST:
+ return translateBinary2(op, ((RexCall) left).operands.get(0), right);
+ case OTHER_FUNCTION:
+ String itemName = ElasticsearchRules.isItem((RexCall) left);
+ if (itemName != null) {
+ translateOp2(op, itemName, rightLiteral);
+ return true;
+ }
+ // fall through
+ default:
+ return false;
+ }
+ }
+
+ private void translateOp2(String op, String name, RexLiteral right) {
+ if (op == null) {
+ eqMap.put(name, right);
+ } else {
+ multimap.put(name, Pair.of(op, right));
+ }
+ }
+ }
+}
+
+// End ElasticsearchFilter.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
new file mode 100644
index 0000000..a0b3af6
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.linq4j.tree.Types;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Builtin methods in the Elasticsearch adapter.
+ */
+enum ElasticsearchMethod {
+ ELASTICSEARCH_QUERYABLE_FIND(ElasticsearchTable.ElasticsearchQueryable.class, "find",
+ List.class, List.class);
+
+ public final Method method;
+
+ public static final ImmutableMap<Method, ElasticsearchMethod> MAP;
+
+ static {
+ final ImmutableMap.Builder<Method, ElasticsearchMethod> builder = ImmutableMap.builder();
+ for (ElasticsearchMethod value: ElasticsearchMethod.values()) {
+ builder.put(value.method, value);
+ }
+ MAP = builder.build();
+ }
+
+ ElasticsearchMethod(Class clazz, String methodName, Class... argumentTypes) {
+ this.method = Types.lookupMethod(clazz, methodName, argumentTypes);
+ }
+}
+
+// End ElasticsearchMethod.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
new file mode 100644
index 0000000..c2c09a5
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Implementation of {@link org.apache.calcite.rel.core.Project}
+ * relational expression in Elasticsearch.
+ */
+public class ElasticsearchProject extends Project implements ElasticsearchRel {
+ public ElasticsearchProject(RelOptCluster cluster, RelTraitSet traitSet, RelNode input,
+ List<? extends RexNode> projects, RelDataType rowType) {
+ super(cluster, traitSet, input, projects, rowType);
+ assert getConvention() == ElasticsearchRel.CONVENTION;
+ assert getConvention() == input.getConvention();
+ }
+
+ @Override public Project copy(RelTraitSet relTraitSet, RelNode input, List<RexNode> projects,
+ RelDataType relDataType) {
+ return new ElasticsearchProject(getCluster(), traitSet, input, projects, relDataType);
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+ }
+
+ @Override public void implement(Implementor implementor) {
+ implementor.visitChild(0, getInput());
+
+ final ElasticsearchRules.RexToElasticsearchTranslator translator =
+ new ElasticsearchRules.RexToElasticsearchTranslator(
+ (JavaTypeFactory) getCluster().getTypeFactory(),
+ ElasticsearchRules.elasticsearchFieldNames(getInput().getRowType()));
+
+ final List<String> findItems = new ArrayList<>();
+ final List<String> scriptFieldItems = new ArrayList<>();
+ for (Pair<RexNode, String> pair: getNamedProjects()) {
+ final String name = pair.right;
+ final String expr = pair.left.accept(translator);
+
+ if (expr.equals("\"" + name + "\"")) {
+ findItems.add(ElasticsearchRules.quote(name));
+ } else if (expr.matches("\"literal\":.+")) {
+ scriptFieldItems.add(ElasticsearchRules.quote(name) + ":{\"script\": "
+ + expr.split(":")[1] + "}");
+ } else {
+ scriptFieldItems.add(ElasticsearchRules.quote(name) + ":{\"script\":\"_source."
+ + expr.replaceAll("\"", "") + "\"}");
+ }
+ }
+ final String findString = Util.toString(findItems, "", ", ", "");
+ final String scriptFieldString = "\"script_fields\": {"
+ + Util.toString(scriptFieldItems, "", ", ", "") + "}";
+ final String fieldString = "\"fields\" : [" + findString + "]"
+ + ", " + scriptFieldString;
+
+ for (String opfield : implementor.list) {
+ if (opfield.startsWith("\"fields\"")) {
+ implementor.list.remove(opfield);
+ }
+ }
+ implementor.add(fieldString);
+ }
+}
+
+// End ElasticsearchProject.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
new file mode 100644
index 0000000..e24cb0d
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Relational expression that uses Elasticsearch calling convention.
+ */
+public interface ElasticsearchRel extends RelNode {
+ void implement(Implementor implementor);
+
+ /**
+ * Calling convention for relational operations that occur in Elasticsearch.
+ */
+ Convention CONVENTION = new Convention.Impl("ELASTICSEARCH", ElasticsearchRel.class);
+
+ /**
+ * Callback for the implementation process that converts a tree of
+ * {@link ElasticsearchRel} nodes into an Elasticsearch query.
+ */
+ class Implementor {
+ final List<String> list = new ArrayList<>();
+
+ RelOptTable table;
+ ElasticsearchTable elasticsearchTable;
+
+ public void add(String findOp) {
+ list.add(findOp);
+ }
+
+ public void visitChild(int ordinal, RelNode input) {
+ assert ordinal == 0;
+ ((ElasticsearchRel) input).implement(this);
+ }
+ }
+}
+
+// End ElasticsearchRel.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
new file mode 100644
index 0000000..2e68156
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.adapter.enumerable.RexImpTable;
+import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.util.trace.CalciteTrace;
+
+import org.slf4j.Logger;
+
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Rules and relational operators for
+ * {@link ElasticsearchRel#CONVENTION ELASTICSEARCH}
+ * calling convention.
+ */
+class ElasticsearchRules {
+ protected static final Logger LOGGER = CalciteTrace.getPlannerTracer();
+
+ static final RelOptRule[] RULES = {
+ ElasticsearchSortRule.INSTANCE,
+ ElasticsearchFilterRule.INSTANCE,
+ ElasticsearchProjectRule.INSTANCE
+ };
+
+ private ElasticsearchRules() {}
+
+ /**
+ * Returns 'string' if it is a call to item['string'], null otherwise.
+ */
+ static String isItem(RexCall call) {
+ if (call.getOperator() != SqlStdOperatorTable.ITEM) {
+ return null;
+ }
+ final RexNode op0 = call.getOperands().get(0);
+ final RexNode op1 = call.getOperands().get(1);
+
+ if (op0 instanceof RexInputRef
+ && ((RexInputRef) op0).getIndex() == 0
+ && op1 instanceof RexLiteral
+ && ((RexLiteral) op1).getValue2() instanceof String) {
+ return (String) ((RexLiteral) op1).getValue2();
+ }
+ return null;
+ }
+
+ static List<String> elasticsearchFieldNames(final RelDataType rowType) {
+ return SqlValidatorUtil.uniquify(
+ new AbstractList<String>() {
+ @Override public String get(int index) {
+ final String name = rowType.getFieldList().get(index).getName();
+ return name.startsWith("$") ? "_" + name.substring(2) : name;
+ }
+
+ @Override public int size() {
+ return rowType.getFieldCount();
+ }
+ });
+ }
+
+ static String quote(String s) {
+ return "\"" + s + "\"";
+ }
+
+ /**
+ * Translator from {@link RexNode} to strings in Elasticsearch's expression
+ * language.
+ */
+ static class RexToElasticsearchTranslator extends RexVisitorImpl<String> {
+ private final JavaTypeFactory typeFactory;
+ private final List<String> inFields;
+
+ RexToElasticsearchTranslator(JavaTypeFactory typeFactory, List<String> inFields) {
+ super(true);
+ this.typeFactory = typeFactory;
+ this.inFields = inFields;
+ }
+
+ @Override public String visitLiteral(RexLiteral literal) {
+ if (literal.getValue() == null) {
+ return "null";
+ }
+ return "\"literal\":\""
+ + RexToLixTranslator.translateLiteral(literal, literal.getType(),
+ typeFactory, RexImpTable.NullAs.NOT_POSSIBLE)
+ + "\"";
+ }
+
+ @Override public String visitInputRef(RexInputRef inputRef) {
+ return quote(inFields.get(inputRef.getIndex()));
+ }
+
+ @Override public String visitCall(RexCall call) {
+ final String name = isItem(call);
+ if (name != null) {
+ return "\"" + name + "\"";
+ }
+
+ final List<String> strings = visitList(call.operands);
+ if (call.getKind() == SqlKind.CAST) {
+ return strings.get(0).startsWith("$") ? strings.get(0).substring(1) : strings.get(0);
+ }
+ if (call.getOperator() == SqlStdOperatorTable.ITEM) {
+ final RexNode op1 = call.getOperands().get(1);
+ if (op1 instanceof RexLiteral && op1.getType().getSqlTypeName() == SqlTypeName.INTEGER) {
+ return stripQuotes(strings.get(0)) + "[" + ((RexLiteral) op1).getValue2() + "]";
+ }
+ }
+ throw new IllegalArgumentException("Translation of " + call.toString()
+ + "is not supported by ElasticsearchProject");
+ }
+
+ private String stripQuotes(String s) {
+ return s.startsWith("'") && s.endsWith("'") ? s.substring(1, s.length() - 1) : s;
+ }
+
+ List<String> visitList(List<RexNode> list) {
+ final List<String> strings = new ArrayList<>();
+ for (RexNode node: list) {
+ strings.add(node.accept(this));
+ }
+ return strings;
+ }
+ }
+
+ /**
+ * Base class for planner rules that convert a relational expression to
+ * Elasticsearch calling convention.
+ */
+ abstract static class ElasticsearchConverterRule extends ConverterRule {
+ final Convention out;
+
+ ElasticsearchConverterRule(Class<? extends RelNode> clazz, RelTrait in, Convention out,
+ String description) {
+ super(clazz, in, out, description);
+ this.out = out;
+ }
+ }
+
+ /**
+ * Rule to convert a {@link org.apache.calcite.rel.core.Sort} to an
+ * {@link ElasticsearchSort}.
+ */
+ private static class ElasticsearchSortRule extends ElasticsearchConverterRule {
+ private static final ElasticsearchSortRule INSTANCE = new ElasticsearchSortRule();
+
+ private ElasticsearchSortRule() {
+ super(Sort.class, Convention.NONE, ElasticsearchRel.CONVENTION, "ElasticsearchSortRule");
+ }
+
+ @Override public RelNode convert(RelNode relNode) {
+ final Sort sort = (Sort) relNode;
+ final RelTraitSet traitSet = sort.getTraitSet().replace(out).replace(sort.getCollation());
+ return new ElasticsearchSort(relNode.getCluster(), traitSet,
+ convert(sort.getInput(), traitSet.replace(RelCollations.EMPTY)), sort.getCollation(),
+ sort.offset, sort.fetch);
+ }
+ }
+
+ /**
+ * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalFilter} to an
+ * {@link ElasticsearchFilter}.
+ */
+ private static class ElasticsearchFilterRule extends ElasticsearchConverterRule {
+ private static final ElasticsearchFilterRule INSTANCE = new ElasticsearchFilterRule();
+
+ private ElasticsearchFilterRule() {
+ super(LogicalFilter.class, Convention.NONE, ElasticsearchRel.CONVENTION,
+ "ElasticsearchFilterRule");
+ }
+
+ @Override public RelNode convert(RelNode relNode) {
+ final LogicalFilter filter = (LogicalFilter) relNode;
+ final RelTraitSet traitSet = filter.getTraitSet().replace(out);
+ return new ElasticsearchFilter(relNode.getCluster(), traitSet,
+ convert(filter.getInput(), out),
+ filter.getCondition());
+ }
+ }
+
+ /**
+ * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalProject}
+ * to an {@link ElasticsearchProject}.
+ */
+ private static class ElasticsearchProjectRule extends ElasticsearchConverterRule {
+ private static final ElasticsearchProjectRule INSTANCE = new ElasticsearchProjectRule();
+
+ private ElasticsearchProjectRule() {
+ super(LogicalProject.class, Convention.NONE, ElasticsearchRel.CONVENTION,
+ "ElasticsearchProjectRule");
+ }
+
+ @Override public RelNode convert(RelNode relNode) {
+ final LogicalProject project = (LogicalProject) relNode;
+ final RelTraitSet traitSet = project.getTraitSet().replace(out);
+ return new ElasticsearchProject(project.getCluster(), traitSet,
+ convert(project.getInput(), out), project.getProjects(), project.getRowType());
+ }
+ }
+}
+
+// End ElasticsearchRules.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
new file mode 100644
index 0000000..e59e0a4
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+
+import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Schema mapped onto an index of ELASTICSEARCH types.
+ *
+ * <p>Each table in the schema is an ELASTICSEARCH type in that index.
+ */
+public class ElasticsearchSchema extends AbstractSchema {
+ final String index;
+
+ private transient Client client;
+
+ /**
+ * Creates an Elasticsearch schema.
+ *
+ * @param coordinates Map of Elasticsearch node locations (host, port)
+ * @param userConfig Map of user-specified configurations
+ * @param indexName Elasticsearch database name, e.g. "usa".
+ */
+ ElasticsearchSchema(Map<String, Integer> coordinates,
+ Map<String, String> userConfig, String indexName) {
+ super();
+
+ final List<InetSocketAddress> transportAddresses = new ArrayList<>();
+ for (Map.Entry<String, Integer> coordinate: coordinates.entrySet()) {
+ transportAddresses.add(new InetSocketAddress(coordinate.getKey(), coordinate.getValue()));
+ }
+
+ open(transportAddresses, userConfig);
+
+ if (client != null) {
+ final String[] indices = client.admin().indices()
+ .getIndex(new GetIndexRequest().indices(indexName))
+ .actionGet().getIndices();
+ if (indices.length == 1) {
+ index = indices[0];
+ } else {
+ index = null;
+ }
+ } else {
+ index = null;
+ }
+ }
+
+ @Override protected Map<String, Table> getTableMap() {
+ final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
+
+ try {
+ GetMappingsResponse response = client.admin().indices().getMappings(
+ new GetMappingsRequest().indices(index)).get();
+ ImmutableOpenMap<String, MappingMetaData> mapping = response.getMappings().get(index);
+ for (ObjectObjectCursor<String, MappingMetaData> c: mapping) {
+ builder.put(c.key, new ElasticsearchTable(client, index, c.key));
+ }
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ return builder.build();
+ }
+
+ private void open(List<InetSocketAddress> transportAddresses, Map<String, String> userConfig) {
+ final List<TransportAddress> transportNodes = new ArrayList<>(transportAddresses.size());
+ for (InetSocketAddress address : transportAddresses) {
+ transportNodes.add(new InetSocketTransportAddress(address));
+ }
+
+ Settings settings = Settings.settingsBuilder().put(userConfig).build();
+
+ final TransportClient transportClient = TransportClient.builder().settings(settings).build();
+ for (TransportAddress transport : transportNodes) {
+ transportClient.addTransportAddress(transport);
+ }
+
+ final List<DiscoveryNode> nodes = ImmutableList.copyOf(transportClient.connectedNodes());
+ if (nodes.isEmpty()) {
+ throw new RuntimeException("Cannot connect to any elasticsearch nodes");
+ }
+
+ client = transportClient;
+ }
+}
+
+// End ElasticsearchSchema.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java
new file mode 100644
index 0000000..41ffc10
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaFactory;
+import org.apache.calcite.schema.SchemaPlus;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Factory that creates a {@link ElasticsearchSchema}.
+ *
+ * <p>Allows a custom schema to be included in a model.json file.
+ */
+@SuppressWarnings("UnusedDeclaration")
+public class ElasticsearchSchemaFactory implements SchemaFactory {
+
+ public ElasticsearchSchemaFactory() {
+ }
+
+ @Override public Schema create(SchemaPlus parentSchema, String name,
+ Map<String, Object> operand) {
+ final Map map = (Map) operand;
+
+ final ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
+
+ try {
+ final Map<String, Integer> coordinates =
+ mapper.readValue((String) map.get("coordinates"),
+ new TypeReference<Map<String, Integer>>() { });
+ final Map<String, String> userConfig =
+ mapper.readValue((String) map.get("userConfig"),
+ new TypeReference<Map<String, String>>() { });
+ final String index = (String) map.get("index");
+ return new ElasticsearchSchema(coordinates, userConfig, index);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot parse values from json", e);
+ }
+ }
+}
+
+// End ElasticsearchSchemaFactory.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
new file mode 100644
index 0000000..5f5dfe8
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Implementation of {@link org.apache.calcite.rel.core.Sort}
+ * relational expression in Elasticsearch.
+ */
+public class ElasticsearchSort extends Sort implements ElasticsearchRel {
+ public ElasticsearchSort(RelOptCluster cluster, RelTraitSet traitSet, RelNode child,
+ RelCollation collation, RexNode offset, RexNode fetch) {
+ super(cluster, traitSet, child, collation, offset, fetch);
+ assert getConvention() == ElasticsearchRel.CONVENTION;
+ assert getConvention() == child.getConvention();
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.05);
+ }
+
+ @Override public Sort copy(RelTraitSet traitSet, RelNode relNode, RelCollation relCollation,
+ RexNode offset, RexNode fetch) {
+ return new ElasticsearchSort(getCluster(), traitSet, relNode, collation, offset, fetch);
+ }
+
+ @Override public void implement(Implementor implementor) {
+ implementor.visitChild(0, getInput());
+ if (!collation.getFieldCollations().isEmpty()) {
+ final List<String> keys = new ArrayList<>();
+ final List<RelDataTypeField> fields = getRowType().getFieldList();
+
+ for (RelFieldCollation fieldCollation: collation.getFieldCollations()) {
+ final String name = fields.get(fieldCollation.getFieldIndex()).getName();
+ keys.add(ElasticsearchRules.quote(name) + ": " + direction(fieldCollation));
+ }
+
+ implementor.add("\"sort\": [ " + Util.toString(keys, "{", "}, {", "}") + "]");
+ }
+
+ if (offset != null) {
+ implementor.add("\"from\": " + ((RexLiteral) offset).getValue());
+ }
+
+ if (fetch != null) {
+ implementor.add("\"size\": " + ((RexLiteral) fetch).getValue());
+ }
+ }
+
+ private String direction(RelFieldCollation fieldCollation) {
+ switch (fieldCollation.getDirection()) {
+ case DESCENDING:
+ case STRICTLY_DESCENDING:
+ return "\"desc\"";
+ case ASCENDING:
+ case STRICTLY_ASCENDING:
+ default:
+ return "\"asc\"";
+ }
+ }
+}
+
+// End ElasticsearchSort.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
new file mode 100644
index 0000000..f3dbca5
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.adapter.java.AbstractQueryableTable;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.impl.AbstractTableQueryable;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import org.apache.calcite.util.Util;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.search.SearchHit;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Table based on an Elasticsearch type.
+ */
+public class ElasticsearchTable extends AbstractQueryableTable implements TranslatableTable {
+ private final Client client;
+ private final String indexName;
+ private final String typeName;
+
+ /**
+ * Creates an ElasticsearchTable.
+ */
+ public ElasticsearchTable(Client client, String indexName,
+ String typeName) {
+ super(Object[].class);
+ this.client = client;
+ this.indexName = indexName;
+ this.typeName = typeName;
+ }
+
+ @Override public String toString() {
+ return "ElasticsearchTable{" + typeName + "}";
+ }
+
+ public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
+ final RelDataType mapType = relDataTypeFactory.createMapType(
+ relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR),
+ relDataTypeFactory.createTypeWithNullability(
+ relDataTypeFactory.createSqlType(SqlTypeName.ANY),
+ true));
+ return relDataTypeFactory.builder().add("_MAP", mapType).build();
+ }
+
+ public <T> Queryable<T> asQueryable(QueryProvider queryProvider, SchemaPlus schema,
+ String tableName) {
+ return new ElasticsearchQueryable<>(queryProvider, schema, this, tableName);
+ }
+
+ public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
+ final RelOptCluster cluster = context.getCluster();
+ return new ElasticsearchTableScan(cluster, cluster.traitSetOf(ElasticsearchRel.CONVENTION),
+ relOptTable, this, null);
+ }
+
+ /** Executes a "find" operation on the underlying type.
+ *
+ * <p>For example,
+ * <code>client.prepareSearch(index).setTypes(type)
+ * .setSource("{\"fields\" : [\"state\"]}")</code></p>
+ *
+ * @param index Elasticsearch index
+ * @param ops List of operations represented as Json strings.
+ * @param fields List of fields to project; or null to return map
+ * @return Enumerator of results
+ */
+ private Enumerable<Object> find(String index, List<String> ops,
+ List<Map.Entry<String, Class>> fields) {
+ final String dbName = index;
+
+ final String queryString = "{" + Util.toString(ops, "", ", ", "") + "}";
+
+ final Function1<SearchHit, Object> getter = ElasticsearchEnumerator.getter(fields);
+
+ return new AbstractEnumerable<Object>() {
+ public Enumerator<Object> enumerator() {
+ final Iterator<SearchHit> cursor = client.prepareSearch(dbName).setTypes(typeName)
+ .setSource(queryString).execute().actionGet().getHits().iterator();
+ return new ElasticsearchEnumerator(cursor, getter);
+ }
+ };
+ }
+
+ /**
+ * Implementation of {@link org.apache.calcite.linq4j.Queryable} based on
+ * a {@link org.apache.calcite.adapter.elasticsearch.ElasticsearchTable}.
+ */
+ public static class ElasticsearchQueryable<T> extends AbstractTableQueryable<T> {
+ public ElasticsearchQueryable(QueryProvider queryProvider, SchemaPlus schema,
+ ElasticsearchTable table, String tableName) {
+ super(queryProvider, schema, table, tableName);
+ }
+
+ public Enumerator<T> enumerator() {
+ return null;
+ }
+
+ private String getIndex() {
+ return schema.unwrap(ElasticsearchSchema.class).index;
+ }
+
+ private ElasticsearchTable getTable() {
+ return (ElasticsearchTable) table;
+ }
+
+ /** Called via code-generation.
+ *
+ * @see org.apache.calcite.adapter.elasticsearch.ElasticsearchMethod#ELASTICSEARCH_QUERYABLE_FIND
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public Enumerable<Object> find(List<String> ops,
+ List<Map.Entry<String, Class>> fields) {
+ return getTable().find(getIndex(), ops, fields);
+ }
+ }
+}
+
+// End ElasticsearchTable.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
new file mode 100644
index 0000000..636a629
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+
+import java.util.List;
+
+/**
+ * Relational expression representing a scan of an Elasticsearch type.
+ *
+ * <p> Additional operations might be applied,
+ * using the "find" method.</p>
+ */
+public class ElasticsearchTableScan extends TableScan implements ElasticsearchRel {
+ private final ElasticsearchTable elasticsearchTable;
+ private final RelDataType projectRowType;
+
+ /**
+ * Creates an ElasticsearchTableScan.
+ *
+ * @param cluster Cluster
+ * @param traitSet Trait set
+ * @param table Table
+ * @param elasticsearchTable Elasticsearch table
+ * @param projectRowType Fields and types to project; null to project raw row
+ */
+ protected ElasticsearchTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table,
+ ElasticsearchTable elasticsearchTable, RelDataType projectRowType) {
+ super(cluster, traitSet, table);
+ this.elasticsearchTable = elasticsearchTable;
+ this.projectRowType = projectRowType;
+
+ assert elasticsearchTable != null;
+ assert getConvention() == ElasticsearchRel.CONVENTION;
+ }
+
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ assert inputs.isEmpty();
+ return this;
+ }
+
+ @Override public RelDataType deriveRowType() {
+ return projectRowType != null ? projectRowType : super.deriveRowType();
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ final float f = projectRowType == null ? 1f : (float) projectRowType.getFieldCount() / 100f;
+ return super.computeSelfCost(planner, mq).multiplyBy(.1 * f);
+ }
+
+ @Override public void register(RelOptPlanner planner) {
+ planner.addRule(ElasticsearchToEnumerableConverterRule.INSTANCE);
+ for (RelOptRule rule: ElasticsearchRules.RULES) {
+ planner.addRule(rule);
+ }
+ }
+
+ @Override public void implement(Implementor implementor) {
+ implementor.elasticsearchTable = elasticsearchTable;
+ implementor.table = table;
+ }
+}
+
+// End ElasticsearchTableScan.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
new file mode 100644
index 0000000..adb88f7
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
+import org.apache.calcite.adapter.enumerable.JavaRowFormat;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.MethodCallExpression;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.CalcitePrepareImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterImpl;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.util.Pair;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
+import java.util.AbstractList;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Relational expression representing a scan of a table in an Elasticsearch data source.
+ */
+public class ElasticsearchToEnumerableConverter extends ConverterImpl implements EnumerableRel {
+ protected ElasticsearchToEnumerableConverter(RelOptCluster cluster, RelTraitSet traits,
+ RelNode input) {
+ super(cluster, ConventionTraitDef.INSTANCE, traits, input);
+ }
+
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new ElasticsearchToEnumerableConverter(getCluster(), traitSet, sole(inputs));
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(.1);
+ }
+
+ @Override public Result implement(EnumerableRelImplementor implementor, Prefer prefer) {
+ final BlockBuilder list = new BlockBuilder();
+ final ElasticsearchRel.Implementor elasticsearchImplementor =
+ new ElasticsearchRel.Implementor();
+ elasticsearchImplementor.visitChild(0, getInput());
+ final RelDataType rowType = getRowType();
+ final PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), rowType,
+ prefer.prefer(JavaRowFormat.ARRAY));
+ final Expression fields = list.append("fields",
+ constantArrayList(
+ Pair.zip(ElasticsearchRules.elasticsearchFieldNames(rowType),
+ new AbstractList<Class>() {
+ @Override public Class get(int index) {
+ return physType.fieldClass(index);
+ }
+
+ @Override public int size() {
+ return rowType.getFieldCount();
+ }
+ }),
+ Pair.class));
+ final Expression table = list.append("table",
+ elasticsearchImplementor.table
+ .getExpression(ElasticsearchTable.ElasticsearchQueryable.class));
+ List<String> opList = elasticsearchImplementor.list;
+ final Expression ops = list.append("ops", constantArrayList(opList, String.class));
+ Expression enumerable = list.append("enumerable",
+ Expressions.call(table, ElasticsearchMethod.ELASTICSEARCH_QUERYABLE_FIND.method, ops,
+ fields));
+ if (CalcitePrepareImpl.DEBUG) {
+ System.out.println("Elasticsearch: " + opList);
+ }
+ Hook.QUERY_PLAN.run(opList);
+ list.add(Expressions.return_(null, enumerable));
+ return implementor.result(physType, list.toBlock());
+ }
+
+ /** E.g. {@code constantArrayList("x", "y")} returns
+ * "Arrays.asList('x', 'y')". */
+ private static <T> MethodCallExpression constantArrayList(List<T> values, Class clazz) {
+ return Expressions.call(BuiltInMethod.ARRAYS_AS_LIST.method,
+ Expressions.newArrayInit(clazz, constantList(values)));
+ }
+
+ /** E.g. {@code constantList("x", "y")} returns
+ * {@code {ConstantExpression("x"), ConstantExpression("y")}}. */
+ private static <T> List<Expression> constantList(List<T> values) {
+ return Lists.transform(values,
+ new Function<T, Expression>() {
+ @Nullable
+ @Override public Expression apply(@Nullable T t) {
+ return Expressions.constant(t);
+ }
+ });
+ }
+}
+
+// End ElasticsearchToEnumerableConverter.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverterRule.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverterRule.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverterRule.java
new file mode 100644
index 0000000..1047757
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverterRule.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+
+/**
+ * Rule to convert a relational expression from
+ * {@link ElasticsearchRel#CONVENTION} to {@link EnumerableConvention}.
+ */
+public class ElasticsearchToEnumerableConverterRule extends ConverterRule {
+ public static final ConverterRule INSTANCE = new ElasticsearchToEnumerableConverterRule();
+
+ private ElasticsearchToEnumerableConverterRule() {
+ super(RelNode.class, ElasticsearchRel.CONVENTION, EnumerableConvention.INSTANCE,
+ "ElasticsearchToEnumerableConverterRule");
+ }
+
+ @Override public RelNode convert(RelNode relNode) {
+ RelTraitSet newTraitSet = relNode.getTraitSet().replace(getOutConvention());
+ return new ElasticsearchToEnumerableConverter(relNode.getCluster(), newTraitSet, relNode);
+ }
+}
+
+// End ElasticsearchToEnumerableConverterRule.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/package-info.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/package-info.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/package-info.java
new file mode 100644
index 0000000..dad800a
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Query provider based on an Elasticsearch DB.
+ */
+@PackageMarker
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.avatica.util.PackageMarker;
+
+// End package-info.java