You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2021/01/12 23:00:41 UTC

[GitHub] [drill] vvysotskyi commented on a change in pull request #2135: DRILL-3637: One more pull request for ElasticSearch storage plugin

vvysotskyi commented on a change in pull request #2135:
URL: https://github.com/apache/drill/pull/2135#discussion_r556107998



##########
File path: contrib/storage-elasticsearch/pom.xml
##########
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <properties>
+    <test.elasticsearch.version>7.10.1</test.elasticsearch.version>
+  </properties>
+  <parent>
+    <artifactId>drill-contrib-parent</artifactId>
+    <groupId>org.apache.drill.contrib</groupId>
+    <version>1.19.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>drill-storage-elasticsearch</artifactId>
+
+  <name>contrib/elasticsearch-storage-plugin</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${calcite.groupId}</groupId>
+      <artifactId>calcite-elasticsearch</artifactId>
+      <version>${calcite.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion></exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.elasticsearch.client</groupId>
+      <artifactId>elasticsearch-rest-high-level-client</artifactId>
+      <version>7.0.1</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>net.hydromatic</groupId>
+      <artifactId>foodmart-data-json</artifactId>
+      <version>0.4</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <forkCount combine.self="override">1</forkCount>

Review comment:
       Elastic tests are running using the real elasticsearch instance started by the `elasticsearch-maven-plugin` plugin. Since every test class required preparing its data, running tests concurrently may cause test data corruption and therefore random failures, so they are running in a single thread.

##########
File path: contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+@JsonTypeName(ElasticsearchStorageConfig.NAME)
+public class ElasticsearchStorageConfig extends StoragePluginConfig {
+  public static final String NAME = "elastic";
+
+  private static final ObjectWriter OBJECT_WRITER = new ObjectMapper().writerFor(List.class);
+
+  private final List<String> hosts;
+  private final String username;
+  private final String password;
+
+  @JsonCreator

Review comment:
       We should satisfy prerequisites to be able to do that. From the article:
   
   > The requirements for that are :
   > 
   > 1. JDK 1.8
   > 2. compile with -parameters argument
   > 3. use and register jackson-module-parameter-names
   
   Currently, we have satisfied only the first one...

##########
File path: contrib/storage-elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/CalciteUtils.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.store.elasticsearch.ElasticsearchStorageConfig;
+import org.apache.drill.exec.store.elasticsearch.plan.ElasticSearchEnumerablePrelContext;
+import org.apache.drill.exec.store.elasticsearch.plan.ElasticsearchFilterRule;
+import org.apache.drill.exec.store.elasticsearch.plan.ElasticsearchProjectRule;
+import org.apache.drill.exec.store.enumerable.plan.EnumerableIntermediatePrelConverterRule;
+import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+public class CalciteUtils {
+
+  private static final List<String> BANNED_RULES =
+      Arrays.asList("ElasticsearchProjectRule", "ElasticsearchFilterRule");
+
+  public static final Predicate<RelOptRule> RULE_PREDICATE =
+      relOptRule -> BANNED_RULES.stream()
+          .noneMatch(banned -> relOptRule.toString().startsWith(banned));
+
+  public static final VertexDrelConverterRule ELASTIC_DREL_CONVERTER_RULE =
+      new VertexDrelConverterRule(ElasticsearchRel.CONVENTION);
+
+  public static final EnumerableIntermediatePrelConverterRule ENUMERABLE_INTERMEDIATE_PREL_CONVERTER_RULE =
+      new EnumerableIntermediatePrelConverterRule(
+          new ElasticSearchEnumerablePrelContext(ElasticsearchStorageConfig.NAME));
+
+  public static Set<RelOptRule> elasticSearchRules() {
+    Set<RelOptRule> rules = Arrays.stream(ElasticsearchRules.RULES)
+        .filter(RULE_PREDICATE)

Review comment:
       We filter Calcite implementations of these rules and add our custom versions later.

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
##########
@@ -60,7 +61,7 @@
     this.rules = ImmutableSet.<RelOptRule>builder()
         .addAll(calciteJdbcRules)
         .add(JdbcIntermediatePrelConverterRule.INSTANCE)
-        .add(new JdbcDrelConverterRule(this))
+        .add(new VertexDrelConverterRule(this))

Review comment:
       Yes, this rule is used for several plugins.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassBuilder.java
##########
@@ -144,21 +143,22 @@ public ClassBuilder(DrillConfig config, OptionSet optionManager) {
       saveCode(code, name);
     }
 
+    Class<?> compiledClass = getCompiledClass(code, className, config, options);
+    logger.debug("Compiled {}: time = {} ms.",
+        className,
+        (System.nanoTime() - t1 + 500_000) / 1_000_000);
+    return compiledClass;
+  }
+
+  public static Class<?> getCompiledClass(String code, String className,

Review comment:
       I had to refactor this class to use some logic in `EnumerableRecordReader` instead of code duplication.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/ConvertIntToDecimal.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.convert;
+
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+import java.math.BigDecimal;
+
+public class ConvertIntToDecimal extends DirectConverter {

Review comment:
       It is not for auto casting. It is used for the conversion of the values of int type to decimal when using `ColumnConverter` since implicitly it cannot be done due to class cast exceptions.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/SubsetRemover.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttle;
+import org.apache.calcite.rel.RelShuttleImpl;
+
+public class SubsetRemover extends RelShuttleImpl {

Review comment:
       Thanks, added.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectIntoScanRule.java
##########
@@ -110,14 +108,14 @@ public void onMatch(RelOptRuleCall call) {
         return;
       }
 
-      DrillScanRelBase newScan = createScan(scan, projectPushInfo);
+      TableScan newScan = createScan(scan, projectPushInfo);
 
       List<RexNode> newProjects = new ArrayList<>();
       for (RexNode n : project.getChildExps()) {
         newProjects.add(n.accept(projectPushInfo.getInputReWriter()));
       }
 
-      DrillProjectRelBase newProject =
+      Project newProject =

Review comment:
       With this change, this rule may be applicable to other implementations, there is no specific code that requires Drill implementation, so we shouldn't enforce it to have them.

##########
File path: contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/NodeTypeFinder.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.util.Util;
+
+public class NodeTypeFinder extends RelShuttleImpl {

Review comment:
       Currently, we use it for ElasticSearch. If someone will require this class, it can be moved to the common package later.

##########
File path: contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticPlanTransformer.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.adapter.elasticsearch.DrillElasticsearchTableScan;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchAggregate;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchFilter;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchProject;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchSort;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchTable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rel.type.StructKind;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Implementation of RelShuttleImpl that transforms plan to fit Calcite ElasticSearch rel implementor.
+ */
+public class ElasticPlanTransformer extends RelShuttleImpl {
+
+  private boolean hasProject = false;
+
+  private RelDataTypeField mapField;
+
+  /**
+   * Replaces rowType of RelOptTable by rowType obtained from ElasticsearchTable.
+   */
+  @Override
+  public RelNode visit(TableScan other) {
+    RelOptTableImpl table = (RelOptTableImpl) other.getTable();
+    ElasticsearchTable elasticsearchTable = Objects.requireNonNull(
+        table.unwrap(ElasticsearchTable.class), "ElasticSearch table cannot be null");
+    RelDataType rowType = elasticsearchTable.getRowType(other.getCluster().getTypeFactory());
+    mapField = rowType.getFieldList().get(0);
+    return new DrillElasticsearchTableScan(other.getCluster(), other.getTraitSet(), table.copy(rowType), elasticsearchTable, rowType);
+  }
+
+  @Override
+  public RelNode visit(RelNode other) {
+    // replaces project expressions with ITEM calls, since Calcite returns results as a map
+    if (other instanceof ElasticsearchProject) {
+      ElasticsearchProject project = (ElasticsearchProject) other;
+      RelNode input = project.getInput().accept(this);
+      List<RexNode> convertedExpressions = project.getProjects();
+      // project closest to the scan should be rewritten only
+      if (!this.hasProject) {
+        ElasticExpressionMapper expressionMapper =
+            new ElasticExpressionMapper(project.getCluster().getRexBuilder(),
+                project.getInput().getRowType(), mapField);
+        convertedExpressions = convertedExpressions.stream()
+            .map(expression -> expression.accept(expressionMapper))
+            .collect(Collectors.toList());
+
+        RelRecordType relDataType = getRelRecordType(other.getRowType());
+        this.hasProject = true;
+        return CalciteUtils.createProject(project.getTraitSet(), input,
+            convertedExpressions, relDataType);
+      } else {
+        return input;
+      }
+    } else if (other instanceof ElasticsearchFilter) {
+      ElasticsearchFilter filter = (ElasticsearchFilter) other;
+      RexNode convertedCondition = filter.getCondition().accept(
+          new ElasticExpressionMapper(other.getCluster().getRexBuilder(), filter.getInput().getRowType(), mapField));
+      return filter.copy(other.getTraitSet(), filter.getInput().accept(this), convertedCondition);
+    } else if (other instanceof ElasticsearchSort) {
+      ElasticsearchSort sort = (ElasticsearchSort) other;
+      RelNode input = getMappedInput(sort.getInput());
+      return sort.copy(other.getTraitSet(), input, sort.getCollation(), sort.offset, sort.fetch);
+    } else if (other instanceof ElasticsearchAggregate) {
+      ElasticsearchAggregate aggregate = (ElasticsearchAggregate) other;
+      RelNode input = getMappedInput(aggregate.getInput());
+      return aggregate.copy(other.getTraitSet(), input, aggregate.getGroupSet(),
+          aggregate.getGroupSets(), aggregate.getAggCallList());
+    }
+
+    return super.visit(other);
+  }
+
+  /**
+   * Generates project with mapped expressions above specified rel node
+   * if there is no other project in the tree.

Review comment:
       Drill can generate a plan that may not contain the elastic project, so we ensure that it will be present.

##########
File path: contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticsearchFilterRule.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchFilter;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchFilterRule extends ConverterRule {
+  private static final Logger logger = LoggerFactory.getLogger(ElasticsearchFilterRule.class);
+
+  public static final ElasticsearchFilterRule INSTANCE = new ElasticsearchFilterRule();
+
+  private final Convention out;
+
+  private ElasticsearchFilterRule() {
+    super(Filter.class, Convention.NONE, ElasticsearchRel.CONVENTION,
+        "DrillElasticsearchFilterRule");
+    this.out = ElasticsearchRel.CONVENTION;
+  }
+
+  @Override
+  public RelNode convert(RelNode relNode) {
+    Filter filter = (Filter) relNode;
+    NodeTypeFinder filterFinder = new NodeTypeFinder(ElasticsearchFilter.class);
+    filter.getInput().accept(filterFinder);
+    if (filterFinder.containsNode) {
+      return null;
+    }
+    RelTraitSet traitSet = filter.getTraitSet().replace(out);
+
+    try {
+      CalciteUtils.analyzePredicate(filter.getCondition());
+    } catch (Exception e) {
+      logger.info("Unable to push filter into ElasticSearch :{}", e.getMessage(), e);

Review comment:
       No, it is not an error. We just check whether Calcite can convert filter to elasticsearch filter.
   Calcite enforces us to use exceptions as the control flow antipattern here...

##########
File path: contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticsearchFilterRule.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchFilter;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchFilterRule extends ConverterRule {
+  private static final Logger logger = LoggerFactory.getLogger(ElasticsearchFilterRule.class);
+
+  public static final ElasticsearchFilterRule INSTANCE = new ElasticsearchFilterRule();
+
+  private final Convention out;
+
+  private ElasticsearchFilterRule() {
+    super(Filter.class, Convention.NONE, ElasticsearchRel.CONVENTION,
+        "DrillElasticsearchFilterRule");
+    this.out = ElasticsearchRel.CONVENTION;
+  }
+
+  @Override
+  public RelNode convert(RelNode relNode) {
+    Filter filter = (Filter) relNode;
+    NodeTypeFinder filterFinder = new NodeTypeFinder(ElasticsearchFilter.class);
+    filter.getInput().accept(filterFinder);
+    if (filterFinder.containsNode) {
+      return null;

Review comment:
       No, we don't fail the query here, we just do not convert the filter to the elastic filter.

##########
File path: contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticPlanTransformer.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.adapter.elasticsearch.DrillElasticsearchTableScan;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchAggregate;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchFilter;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchProject;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchSort;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchTable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rel.type.StructKind;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Implementation of RelShuttleImpl that transforms plan to fit Calcite ElasticSearch rel implementor.
+ */
+public class ElasticPlanTransformer extends RelShuttleImpl {
+
+  private boolean hasProject = false;
+
+  private RelDataTypeField mapField;
+
+  /**
+   * Replaces rowType of RelOptTable by rowType obtained from ElasticsearchTable.
+   */
+  @Override
+  public RelNode visit(TableScan other) {
+    RelOptTableImpl table = (RelOptTableImpl) other.getTable();
+    ElasticsearchTable elasticsearchTable = Objects.requireNonNull(
+        table.unwrap(ElasticsearchTable.class), "ElasticSearch table cannot be null");
+    RelDataType rowType = elasticsearchTable.getRowType(other.getCluster().getTypeFactory());
+    mapField = rowType.getFieldList().get(0);
+    return new DrillElasticsearchTableScan(other.getCluster(), other.getTraitSet(), table.copy(rowType), elasticsearchTable, rowType);
+  }
+
+  @Override
+  public RelNode visit(RelNode other) {
+    // replaces project expressions with ITEM calls, since Calcite returns results as a map

Review comment:
       Thanks, reworded.

##########
File path: contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticPlanTransformer.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.adapter.elasticsearch.DrillElasticsearchTableScan;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchAggregate;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchFilter;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchProject;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchSort;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchTable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rel.type.StructKind;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Implementation of RelShuttleImpl that transforms plan to fit Calcite ElasticSearch rel implementor.
+ */
+public class ElasticPlanTransformer extends RelShuttleImpl {
+
+  private boolean hasProject = false;
+
+  private RelDataTypeField mapField;
+
+  /**
+   * Replaces rowType of RelOptTable by rowType obtained from ElasticsearchTable.
+   */
+  @Override
+  public RelNode visit(TableScan other) {
+    RelOptTableImpl table = (RelOptTableImpl) other.getTable();
+    ElasticsearchTable elasticsearchTable = Objects.requireNonNull(
+        table.unwrap(ElasticsearchTable.class), "ElasticSearch table cannot be null");
+    RelDataType rowType = elasticsearchTable.getRowType(other.getCluster().getTypeFactory());
+    mapField = rowType.getFieldList().get(0);
+    return new DrillElasticsearchTableScan(other.getCluster(), other.getTraitSet(), table.copy(rowType), elasticsearchTable, rowType);
+  }
+
+  @Override
+  public RelNode visit(RelNode other) {
+    // replaces project expressions with ITEM calls, since Calcite returns results as a map
+    if (other instanceof ElasticsearchProject) {
+      ElasticsearchProject project = (ElasticsearchProject) other;
+      RelNode input = project.getInput().accept(this);
+      List<RexNode> convertedExpressions = project.getProjects();
+      // project closest to the scan should be rewritten only
+      if (!this.hasProject) {
+        ElasticExpressionMapper expressionMapper =
+            new ElasticExpressionMapper(project.getCluster().getRexBuilder(),
+                project.getInput().getRowType(), mapField);
+        convertedExpressions = convertedExpressions.stream()
+            .map(expression -> expression.accept(expressionMapper))
+            .collect(Collectors.toList());
+
+        RelRecordType relDataType = getRelRecordType(other.getRowType());
+        this.hasProject = true;
+        return CalciteUtils.createProject(project.getTraitSet(), input,
+            convertedExpressions, relDataType);
+      } else {
+        return input;
+      }
+    } else if (other instanceof ElasticsearchFilter) {
+      ElasticsearchFilter filter = (ElasticsearchFilter) other;
+      RexNode convertedCondition = filter.getCondition().accept(
+          new ElasticExpressionMapper(other.getCluster().getRexBuilder(), filter.getInput().getRowType(), mapField));
+      return filter.copy(other.getTraitSet(), filter.getInput().accept(this), convertedCondition);
+    } else if (other instanceof ElasticsearchSort) {
+      ElasticsearchSort sort = (ElasticsearchSort) other;
+      RelNode input = getMappedInput(sort.getInput());
+      return sort.copy(other.getTraitSet(), input, sort.getCollation(), sort.offset, sort.fetch);
+    } else if (other instanceof ElasticsearchAggregate) {
+      ElasticsearchAggregate aggregate = (ElasticsearchAggregate) other;
+      RelNode input = getMappedInput(aggregate.getInput());
+      return aggregate.copy(other.getTraitSet(), input, aggregate.getGroupSet(),
+          aggregate.getGroupSets(), aggregate.getAggCallList());
+    }
+
+    return super.visit(other);
+  }
+
+  /**
+   * Generates project with mapped expressions above specified rel node
+   * if there is no other project in the tree.
+   */
+  private RelNode getMappedInput(RelNode relNode) {
+    boolean hasProject = this.hasProject;
+    this.hasProject = false;
+    RelNode input = relNode.accept(this);
+    if (!this.hasProject) {
+      this.hasProject = hasProject;
+      RelOptCluster cluster = relNode.getCluster();
+      List<RexNode> projections = IntStream.range(0, relNode.getRowType().getFieldCount())
+          .mapToObj(i -> cluster.getRexBuilder().makeInputRef(relNode, i))
+          .collect(Collectors.toList());
+
+      return CalciteUtils.createProject(relNode.getTraitSet(), relNode,
+          projections, relNode.getRowType()).accept(this);
+    } else {
+      return input;
+    }
+  }
+
+  private RelRecordType getRelRecordType(RelDataType rowType) {
+    List<RelDataTypeField> fields = new ArrayList<>();
+    for (RelDataTypeField relDataTypeField : rowType.getFieldList()) {
+      if (relDataTypeField.isDynamicStar()) {
+        fields.add(mapField);
+      } else {
+        fields.add(relDataTypeField);
+      }
+    }
+
+    return new RelRecordType(StructKind.FULLY_QUALIFIED, fields, false);
+  }
+
+  /**
+   * Implementation of RexShuttle that replaces RexInputRef expressions with ITEM calls to _MAP field.
+   */
+  public static class ElasticExpressionMapper extends RexShuttle {
+    private final RexBuilder rexBuilder;
+    private final RelDataType relDataType;
+    private final RelDataTypeField mapField;
+
+    public ElasticExpressionMapper(RexBuilder rexBuilder, RelDataType relDataType, RelDataTypeField mapField) {
+      this.rexBuilder = rexBuilder;
+      this.relDataType = relDataType;
+      this.mapField = mapField;
+    }
+
+    @Override
+    public RexNode visitInputRef(RexInputRef inputRef) {
+      if (inputRef.getType().getSqlTypeName() == SqlTypeName.DYNAMIC_STAR) {
+        return rexBuilder.makeInputRef(mapField.getType(), 0);
+      }
+      return rexBuilder.makeCall(SqlStdOperatorTable.ITEM, rexBuilder.makeInputRef(relDataType, 0),
+          rexBuilder.makeLiteral(relDataType.getFieldNames().get(inputRef.getIndex())));
+    }
+  }
+}

Review comment:
       No, it can't be simplified. Returning `_MAP` field instead of generating `ITEM` calls will enforce users to specify these `ITEM` calls in the query to access desired fields. 

##########
File path: contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch;
+
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class ElasticSearchPlanTest extends ClusterTest {

Review comment:
       I'm not sure that `ElasticSearchClusterTest` will fit here since we do not start ElasticSearch from the Java code but from the maven plugin. Also, every test class has its specific generated data.

##########
File path: contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticsearchProjectRule.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchProject;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ElasticsearchProjectRule extends ConverterRule {

Review comment:
       The corresponding rule from the ElasticSearch adapter is much simpler, it just converts the projection expressions to the elasticsearch expressions, which may fail later. But here we have a logic to split the project if it would have expressions. Also, using this rule, we avoid the https://issues.apache.org/jira/browse/CALCITE-4440 issue.

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
##########
@@ -78,19 +77,6 @@ private String stripToOneLineSql(String sql) {
     return strippedSqlTextBldr.toString();
   }
 
-  private static class SubsetRemover extends RelShuttleImpl {

Review comment:
       It was moved to java-exec package to be able to use it in other storage plugins.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/StandardConversions.java
##########
@@ -257,8 +258,9 @@ public ConversionDefn analyze(MinorType inputType, ColumnMetadata outputSchema)
         case BIGINT:
         case FLOAT4:
         case FLOAT8:
-        case VARDECIMAL:
           return IMPLICIT;
+        case VARDECIMAL:
+          return new ConversionDefn(ConvertIntToDecimal.class);

Review comment:
       These both different terms. The existing one `ConversionDefn` is the "Definition of a conversion" and that's what the class does. But the argument is a column schema, which is also fine. So leaving it as it is.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/StandardConversions.java
##########
@@ -203,9 +203,9 @@ public DirectConverter newInstance(
    * <p>
    * Does not support any of the "legacy" decimal types.
    *
-   * @param inputDefn the column schema for the input column which the
+   * @param inputSchema the column schema for the input column which the

Review comment:
       `inputSchema` fits better for Drill, since we have our own terms and use them widely in the documentation. Please note, that I've just updated JavaDoc to correspond to actual method parameters.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/VertexDrel.java
##########
@@ -27,19 +25,21 @@
 import org.apache.drill.exec.planner.logical.DrillImplementor;
 import org.apache.drill.exec.planner.logical.DrillRel;
 
-public class JdbcDrel extends SingleRel implements DrillRel {
+import java.util.List;
+
+public class VertexDrel extends SingleRel implements DrillRel {

Review comment:
       Thanks, added javadoc.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LeafPrel.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+public interface LeafPrel extends Prel {

Review comment:
       It is not related to fragments. It is a prel without children, so leaf. Thanks, added javadoc.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/record/ColumnConverter.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MapColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.DictWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+
+/**
+ * Converts and sets given value into the specific column writer.
+ */
+public interface ColumnConverter {

Review comment:
       `ColumnConverter` is a handy way of converting and setting values to the specific column writer. Yes, we use it also for ElasticSearch. Please take a look at the JavaDocs of its implementations, they provide all required additional info.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
##########
@@ -284,7 +284,7 @@ public static boolean isLimit0(RexNode fetch) {
   public static boolean isProjectOutputRowcountUnknown(Project project) {
     for (RexNode rex : project.getProjects()) {
       if (rex instanceof RexCall) {
-        if ("flatten".equals(((RexCall) rex).getOperator().getName().toLowerCase())) {
+        if ("flatten".equalsIgnoreCase(((RexCall) rex).getOperator().getName())) {

Review comment:
       We can't control the casing of the expressions, some of them are generated from Calcite.

##########
File path: contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStoragePlugin.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.elasticsearch.schema.ElasticsearchDrillSchemaFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class ElasticsearchStoragePlugin extends AbstractStoragePlugin {
+  private final ElasticsearchStorageConfig config;
+  private final ElasticsearchDrillSchemaFactory schemaFactory;
+
+  public ElasticsearchStoragePlugin(
+      ElasticsearchStorageConfig config, DrillbitContext context, String name) {
+    super(context, name);
+    this.config = config;
+    this.schemaFactory = new ElasticsearchDrillSchemaFactory(name, this);
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws JsonProcessingException {
+    schemaFactory.registerSchemas(schemaConfig, parent);
+  }
+
+  @Override
+  public ElasticsearchStorageConfig getConfig() {
+    return config;
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
+    switch (phase) {
+      case LOGICAL_PRUNE_AND_JOIN:
+      case LOGICAL_PRUNE:
+      case PARTITION_PRUNING:
+        return Collections.emptySet();

Review comment:
       Oh, no, it is because of experimenting with the rules. I've reordered switch branches, so we will use `Collections.emptySet()`, which actually is also immutable.

##########
File path: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/convert/TestDirectConverter.java
##########
@@ -708,7 +708,7 @@ public void testBasicConversionType() {
     expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, bigIntCol));
     expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, float4Col));
     expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, float8Col));
-    expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, decimalCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(tinyIntCol, decimalCol));

Review comment:
       Because actually, it is impossible to implicitly convert int value to decimal by column writers.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DrillDataContext.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.enumerable;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.schema.SchemaPlus;
+
+import java.util.Map;
+
+public class DrillDataContext implements DataContext {
+  private final SchemaPlus rootSchema;

Review comment:
       `DataContextImpl` is not public, so can't be used. `DataContext.getRootSchema()` returns `SchemaPlus`, so no need to hold `CalciteSchema`.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java
##########
@@ -210,16 +209,6 @@ protected DrillRel convertToDrel(RelNode relNode, AbstractSchema schema, String
     return new DrillScreenRel(writerRel.getCluster(), writerRel.getTraitSet(), writerRel);
   }
 
-  public static DrillScanRel findScan(RelNode... rels) {
-    for (RelNode rel : rels) {
-      if (rel instanceof DrillScanRel) {
-        return (DrillScanRel) rel;
-      } else {
-        return findScan(rel.getInputs().toArray(new RelNode[0]));
-      }
-    }
-    return null;
-  }
   // Make sure no unsupported features in ANALYZE statement are used

Review comment:
       I think it should be done out of this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org