You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2021/01/12 08:43:54 UTC

[GitHub] [drill] vdiravka commented on a change in pull request #2135: DRILL-3637: One more pull request for ElasticSearch storage plugin

vdiravka commented on a change in pull request #2135:
URL: https://github.com/apache/drill/pull/2135#discussion_r555311814



##########
File path: contrib/storage-elasticsearch/pom.xml
##########
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <properties>
+    <test.elasticsearch.version>7.10.1</test.elasticsearch.version>
+  </properties>
+  <parent>
+    <artifactId>drill-contrib-parent</artifactId>
+    <groupId>org.apache.drill.contrib</groupId>
+    <version>1.19.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>drill-storage-elasticsearch</artifactId>
+
+  <name>contrib/elasticsearch-storage-plugin</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${calcite.groupId}</groupId>
+      <artifactId>calcite-elasticsearch</artifactId>
+      <version>${calcite.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion></exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.elasticsearch.client</groupId>
+      <artifactId>elasticsearch-rest-high-level-client</artifactId>
+      <version>7.0.1</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>net.hydromatic</groupId>
+      <artifactId>foodmart-data-json</artifactId>
+      <version>0.4</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <forkCount combine.self="override">1</forkCount>

Review comment:
       parallel test execution is not permitted?

##########
File path: contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticPlanTransformer.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.adapter.elasticsearch.DrillElasticsearchTableScan;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchAggregate;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchFilter;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchProject;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchSort;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchTable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rel.type.StructKind;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Implementation of RelShuttleImpl that transforms plan to fit Calcite ElasticSearch rel implementor.
+ */
+public class ElasticPlanTransformer extends RelShuttleImpl {
+
+  private boolean hasProject = false;
+
+  private RelDataTypeField mapField;
+
+  /**
+   * Replaces rowType of RelOptTable by rowType obtained from ElasticsearchTable.
+   */
+  @Override
+  public RelNode visit(TableScan other) {
+    RelOptTableImpl table = (RelOptTableImpl) other.getTable();
+    ElasticsearchTable elasticsearchTable = Objects.requireNonNull(
+        table.unwrap(ElasticsearchTable.class), "ElasticSearch table cannot be null");
+    RelDataType rowType = elasticsearchTable.getRowType(other.getCluster().getTypeFactory());
+    mapField = rowType.getFieldList().get(0);
+    return new DrillElasticsearchTableScan(other.getCluster(), other.getTraitSet(), table.copy(rowType), elasticsearchTable, rowType);
+  }
+
+  @Override
+  public RelNode visit(RelNode other) {
+    // replaces project expressions with ITEM calls, since Calcite returns results as a map

Review comment:
       Could you elaborate please how map is different from ITEM calls?

##########
File path: contrib/storage-elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/CalciteUtils.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.elasticsearch;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.store.elasticsearch.ElasticsearchStorageConfig;
+import org.apache.drill.exec.store.elasticsearch.plan.ElasticSearchEnumerablePrelContext;
+import org.apache.drill.exec.store.elasticsearch.plan.ElasticsearchFilterRule;
+import org.apache.drill.exec.store.elasticsearch.plan.ElasticsearchProjectRule;
+import org.apache.drill.exec.store.enumerable.plan.EnumerableIntermediatePrelConverterRule;
+import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+public class CalciteUtils {
+
+  private static final List<String> BANNED_RULES =
+      Arrays.asList("ElasticsearchProjectRule", "ElasticsearchFilterRule");
+
+  public static final Predicate<RelOptRule> RULE_PREDICATE =
+      relOptRule -> BANNED_RULES.stream()
+          .noneMatch(banned -> relOptRule.toString().startsWith(banned));
+
+  public static final VertexDrelConverterRule ELASTIC_DREL_CONVERTER_RULE =
+      new VertexDrelConverterRule(ElasticsearchRel.CONVENTION);
+
+  public static final EnumerableIntermediatePrelConverterRule ENUMERABLE_INTERMEDIATE_PREL_CONVERTER_RULE =
+      new EnumerableIntermediatePrelConverterRule(
+          new ElasticSearchEnumerablePrelContext(ElasticsearchStorageConfig.NAME));
+
+  public static Set<RelOptRule> elasticSearchRules() {
+    Set<RelOptRule> rules = Arrays.stream(ElasticsearchRules.RULES)
+        .filter(RULE_PREDICATE)

Review comment:
       we filter `ElasticsearchProjectRule` and `ElasticsearchFilterRule` rules and then add them to the list again?
   Or we added other versions of these rules?

##########
File path: contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStorageConfig.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+@JsonTypeName(ElasticsearchStorageConfig.NAME)
+public class ElasticsearchStorageConfig extends StoragePluginConfig {
+  public static final String NAME = "elastic";
+
+  private static final ObjectWriter OBJECT_WRITER = new ObjectMapper().writerFor(List.class);
+
+  private final List<String> hosts;
+  private final String username;
+  private final String password;
+
+  @JsonCreator

Review comment:
       What about eliminating `@JsonCreator` and `@JsonProperty(*)` annotations? It is not mandatory, if jdk8 used:
   https://manosnikolaidis.wordpress.com/2015/08/25/jackson-without-annotations/

##########
File path: contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticsearchFilterRule.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchFilter;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchFilterRule extends ConverterRule {
+  private static final Logger logger = LoggerFactory.getLogger(ElasticsearchFilterRule.class);
+
+  public static final ElasticsearchFilterRule INSTANCE = new ElasticsearchFilterRule();
+
+  private final Convention out;
+
+  private ElasticsearchFilterRule() {
+    super(Filter.class, Convention.NONE, ElasticsearchRel.CONVENTION,
+        "DrillElasticsearchFilterRule");
+    this.out = ElasticsearchRel.CONVENTION;
+  }
+
+  @Override
+  public RelNode convert(RelNode relNode) {
+    Filter filter = (Filter) relNode;
+    NodeTypeFinder filterFinder = new NodeTypeFinder(ElasticsearchFilter.class);
+    filter.getInput().accept(filterFinder);
+    if (filterFinder.containsNode) {
+      return null;
+    }
+    RelTraitSet traitSet = filter.getTraitSet().replace(out);
+
+    try {
+      CalciteUtils.analyzePredicate(filter.getCondition());
+    } catch (Exception e) {
+      logger.info("Unable to push filter into ElasticSearch :{}", e.getMessage(), e);

Review comment:
       It looks some sort of error, not info. Do you agree?
   Doesn't seem to a regular flow as for me.

##########
File path: contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/NodeTypeFinder.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.util.Util;
+
+public class NodeTypeFinder extends RelShuttleImpl {

Review comment:
       Do we need it only for ElasticSearch or possibly it makes sense to factor out it to higher Drill planning level package.

##########
File path: contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch;
+
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class ElasticSearchPlanTest extends ClusterTest {

Review comment:
       Consider creating now or later the `ElasticSearchClusterTest` test suite including `init()`, `cleanUp()` and `prepareData()` methods.

##########
File path: contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticsearchProjectRule.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchProject;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ElasticsearchProjectRule extends ConverterRule {

Review comment:
       What about this rule in ElasticSearch?

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectIntoScanRule.java
##########
@@ -110,14 +108,14 @@ public void onMatch(RelOptRuleCall call) {
         return;
       }
 
-      DrillScanRelBase newScan = createScan(scan, projectPushInfo);
+      TableScan newScan = createScan(scan, projectPushInfo);
 
       List<RexNode> newProjects = new ArrayList<>();
       for (RexNode n : project.getChildExps()) {
         newProjects.add(n.accept(projectPushInfo.getInputReWriter()));
       }
 
-      DrillProjectRelBase newProject =
+      Project newProject =

Review comment:
       Why not Drill specific implementations? Can we have other implementations here than Drill's ones?

##########
File path: contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticsearchFilterRule.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchFilter;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchFilterRule extends ConverterRule {
+  private static final Logger logger = LoggerFactory.getLogger(ElasticsearchFilterRule.class);
+
+  public static final ElasticsearchFilterRule INSTANCE = new ElasticsearchFilterRule();
+
+  private final Convention out;
+
+  private ElasticsearchFilterRule() {
+    super(Filter.class, Convention.NONE, ElasticsearchRel.CONVENTION,
+        "DrillElasticsearchFilterRule");
+    this.out = ElasticsearchRel.CONVENTION;
+  }
+
+  @Override
+  public RelNode convert(RelNode relNode) {
+    Filter filter = (Filter) relNode;
+    NodeTypeFinder filterFinder = new NodeTypeFinder(ElasticsearchFilter.class);
+    filter.getInput().accept(filterFinder);
+    if (filterFinder.containsNode) {
+      return null;

Review comment:
       Do we fail the query here? logger?

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/ConvertIntToDecimal.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.convert;
+
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+import java.math.BigDecimal;
+
+public class ConvertIntToDecimal extends DirectConverter {

Review comment:
       I wondered we didn't have these classes earlier. Does that mean autocasting between these types didnt' work? Only casting to String?

##########
File path: contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticPlanTransformer.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.adapter.elasticsearch.DrillElasticsearchTableScan;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchAggregate;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchFilter;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchProject;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchSort;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchTable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rel.type.StructKind;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Implementation of RelShuttleImpl that transforms plan to fit Calcite ElasticSearch rel implementor.
+ */
+public class ElasticPlanTransformer extends RelShuttleImpl {
+
+  private boolean hasProject = false;
+
+  private RelDataTypeField mapField;
+
+  /**
+   * Replaces rowType of RelOptTable by rowType obtained from ElasticsearchTable.
+   */
+  @Override
+  public RelNode visit(TableScan other) {
+    RelOptTableImpl table = (RelOptTableImpl) other.getTable();
+    ElasticsearchTable elasticsearchTable = Objects.requireNonNull(
+        table.unwrap(ElasticsearchTable.class), "ElasticSearch table cannot be null");
+    RelDataType rowType = elasticsearchTable.getRowType(other.getCluster().getTypeFactory());
+    mapField = rowType.getFieldList().get(0);
+    return new DrillElasticsearchTableScan(other.getCluster(), other.getTraitSet(), table.copy(rowType), elasticsearchTable, rowType);
+  }
+
+  @Override
+  public RelNode visit(RelNode other) {
+    // replaces project expressions with ITEM calls, since Calcite returns results as a map
+    if (other instanceof ElasticsearchProject) {
+      ElasticsearchProject project = (ElasticsearchProject) other;
+      RelNode input = project.getInput().accept(this);
+      List<RexNode> convertedExpressions = project.getProjects();
+      // project closest to the scan should be rewritten only
+      if (!this.hasProject) {
+        ElasticExpressionMapper expressionMapper =
+            new ElasticExpressionMapper(project.getCluster().getRexBuilder(),
+                project.getInput().getRowType(), mapField);
+        convertedExpressions = convertedExpressions.stream()
+            .map(expression -> expression.accept(expressionMapper))
+            .collect(Collectors.toList());
+
+        RelRecordType relDataType = getRelRecordType(other.getRowType());
+        this.hasProject = true;
+        return CalciteUtils.createProject(project.getTraitSet(), input,
+            convertedExpressions, relDataType);
+      } else {
+        return input;
+      }
+    } else if (other instanceof ElasticsearchFilter) {
+      ElasticsearchFilter filter = (ElasticsearchFilter) other;
+      RexNode convertedCondition = filter.getCondition().accept(
+          new ElasticExpressionMapper(other.getCluster().getRexBuilder(), filter.getInput().getRowType(), mapField));
+      return filter.copy(other.getTraitSet(), filter.getInput().accept(this), convertedCondition);
+    } else if (other instanceof ElasticsearchSort) {
+      ElasticsearchSort sort = (ElasticsearchSort) other;
+      RelNode input = getMappedInput(sort.getInput());
+      return sort.copy(other.getTraitSet(), input, sort.getCollation(), sort.offset, sort.fetch);
+    } else if (other instanceof ElasticsearchAggregate) {
+      ElasticsearchAggregate aggregate = (ElasticsearchAggregate) other;
+      RelNode input = getMappedInput(aggregate.getInput());
+      return aggregate.copy(other.getTraitSet(), input, aggregate.getGroupSet(),
+          aggregate.getGroupSets(), aggregate.getAggCallList());
+    }
+
+    return super.visit(other);
+  }
+
+  /**
+   * Generates project with mapped expressions above specified rel node
+   * if there is no other project in the tree.
+   */
+  private RelNode getMappedInput(RelNode relNode) {
+    boolean hasProject = this.hasProject;
+    this.hasProject = false;
+    RelNode input = relNode.accept(this);
+    if (!this.hasProject) {
+      this.hasProject = hasProject;
+      RelOptCluster cluster = relNode.getCluster();
+      List<RexNode> projections = IntStream.range(0, relNode.getRowType().getFieldCount())
+          .mapToObj(i -> cluster.getRexBuilder().makeInputRef(relNode, i))
+          .collect(Collectors.toList());
+
+      return CalciteUtils.createProject(relNode.getTraitSet(), relNode,
+          projections, relNode.getRowType()).accept(this);
+    } else {
+      return input;
+    }
+  }
+
+  private RelRecordType getRelRecordType(RelDataType rowType) {
+    List<RelDataTypeField> fields = new ArrayList<>();
+    for (RelDataTypeField relDataTypeField : rowType.getFieldList()) {
+      if (relDataTypeField.isDynamicStar()) {
+        fields.add(mapField);
+      } else {
+        fields.add(relDataTypeField);
+      }
+    }
+
+    return new RelRecordType(StructKind.FULLY_QUALIFIED, fields, false);
+  }
+
+  /**
+   * Implementation of RexShuttle that replaces RexInputRef expressions with ITEM calls to _MAP field.
+   */
+  public static class ElasticExpressionMapper extends RexShuttle {
+    private final RexBuilder rexBuilder;
+    private final RelDataType relDataType;
+    private final RelDataTypeField mapField;
+
+    public ElasticExpressionMapper(RexBuilder rexBuilder, RelDataType relDataType, RelDataTypeField mapField) {
+      this.rexBuilder = rexBuilder;
+      this.relDataType = relDataType;
+      this.mapField = mapField;
+    }
+
+    @Override
+    public RexNode visitInputRef(RexInputRef inputRef) {
+      if (inputRef.getType().getSqlTypeName() == SqlTypeName.DYNAMIC_STAR) {
+        return rexBuilder.makeInputRef(mapField.getType(), 0);
+      }
+      return rexBuilder.makeCall(SqlStdOperatorTable.ITEM, rexBuilder.makeInputRef(relDataType, 0),
+          rexBuilder.makeLiteral(relDataType.getFieldNames().get(inputRef.getIndex())));
+    }
+  }
+}

Review comment:
       Could we simplify this class in future and use directly _MAP field instead of ITEM calls?
   

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
##########
@@ -284,7 +284,7 @@ public static boolean isLimit0(RexNode fetch) {
   public static boolean isProjectOutputRowcountUnknown(Project project) {
     for (RexNode rex : project.getProjects()) {
       if (rex instanceof RexCall) {
-        if ("flatten".equals(((RexCall) rex).getOperator().getName().toLowerCase())) {
+        if ("flatten".equalsIgnoreCase(((RexCall) rex).getOperator().getName())) {

Review comment:
       I think it could be more convenient to keep all operator names in lower case and then their comparison would be easier too.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/StandardConversions.java
##########
@@ -203,9 +203,9 @@ public DirectConverter newInstance(
    * <p>
    * Does not support any of the "legacy" decimal types.
    *
-   * @param inputDefn the column schema for the input column which the
+   * @param inputSchema the column schema for the input column which the

Review comment:
       I found that regarding to the `column schema` the `column definition` is used more often in SQL:
   https://dev.mysql.com/doc/dev/mysql-server/8.0.12/page_protocol_com_query_response_text_resultset_column_definition.html
   https://docs.oracle.com/javadb/10.8.3.0/ref/rrefsqlj30540.html
   http://vb.net-informations.com/dataset/dataset-column-definition-sqlserver.htm
   
   Possibly `inputDefn` -> `inputDefinition` 

##########
File path: contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/ElasticsearchStoragePlugin.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.elasticsearch.schema.ElasticsearchDrillSchemaFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class ElasticsearchStoragePlugin extends AbstractStoragePlugin {
+  private final ElasticsearchStorageConfig config;
+  private final ElasticsearchDrillSchemaFactory schemaFactory;
+
+  public ElasticsearchStoragePlugin(
+      ElasticsearchStorageConfig config, DrillbitContext context, String name) {
+    super(context, name);
+    this.config = config;
+    this.schemaFactory = new ElasticsearchDrillSchemaFactory(name, this);
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws JsonProcessingException {
+    schemaFactory.registerSchemas(schemaConfig, parent);
+  }
+
+  @Override
+  public ElasticsearchStorageConfig getConfig() {
+    return config;
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
+    switch (phase) {
+      case LOGICAL_PRUNE_AND_JOIN:
+      case LOGICAL_PRUNE:
+      case PARTITION_PRUNING:
+        return Collections.emptySet();

Review comment:
       We don't use `RegularImmutableSet` here, because we can want to add some rules later?

##########
File path: contrib/storage-elasticsearch/src/main/resources/bootstrap-storage-plugins.json
##########
@@ -0,0 +1,10 @@
+{
+  "storage":{
+    "elastic" : {
+      "type" : "elastic",
+      "hosts": ["http://localhost:9200"],
+      "username" : null,
+      "password" : null

Review comment:
       Relates to other plugins plain text issue.

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
##########
@@ -78,19 +77,6 @@ private String stripToOneLineSql(String sql) {
     return strippedSqlTextBldr.toString();
   }
 
-  private static class SubsetRemover extends RelShuttleImpl {

Review comment:
       Why removed?

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassBuilder.java
##########
@@ -144,21 +143,22 @@ public ClassBuilder(DrillConfig config, OptionSet optionManager) {
       saveCode(code, name);
     }
 
+    Class<?> compiledClass = getCompiledClass(code, className, config, options);
+    logger.debug("Compiled {}: time = {} ms.",
+        className,
+        (System.nanoTime() - t1 + 500_000) / 1_000_000);
+    return compiledClass;
+  }
+
+  public static Class<?> getCompiledClass(String code, String className,

Review comment:
       Some issue was here or it is just code refactoring?

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/record/ColumnConverter.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MapColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.DictWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+
+/**
+ * Converts and sets given value into the specific column writer.
+ */
+public interface ColumnConverter {

Review comment:
       Why do we need this converter? Looks like we used it only for Avro and now it is needed for ElasticSearch too, right?
   Could you add more description here, when it is needed and when we don't need it. Or add some example, pls?
   
   Can we convert and prepare columns for readers earlier in the `org.apache.drill.exec.physical.impl.scan.convert.ColumnConverter`?

##########
File path: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
##########
@@ -60,7 +61,7 @@
     this.rules = ImmutableSet.<RelOptRule>builder()
         .addAll(calciteJdbcRules)
         .add(JdbcIntermediatePrelConverterRule.INSTANCE)
-        .add(new JdbcDrelConverterRule(this))
+        .add(new VertexDrelConverterRule(this))

Review comment:
       Just renamed because of usage not only for JDBC?

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java
##########
@@ -210,16 +209,6 @@ protected DrillRel convertToDrel(RelNode relNode, AbstractSchema schema, String
     return new DrillScreenRel(writerRel.getCluster(), writerRel.getTraitSet(), writerRel);
   }
 
-  public static DrillScanRel findScan(RelNode... rels) {
-    for (RelNode rel : rels) {
-      if (rel instanceof DrillScanRel) {
-        return (DrillScanRel) rel;
-      } else {
-        return findScan(rel.getInputs().toArray(new RelNode[0]));
-      }
-    }
-    return null;
-  }
   // Make sure no unsupported features in ANALYZE statement are used

Review comment:
       Could you improve this class (if you want for sure), it has a lot of method descriptions, they needed to be converted to JavaDocs

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LeafPrel.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+public interface LeafPrel extends Prel {

Review comment:
       Looks like we have major and minor fragments (leaves) only after transforming the physical plan.
   Do we really want to introduce the leaves on Physical Planning stage?
   
   UPD: Or it is other leaf? So pls add description here

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/StandardConversions.java
##########
@@ -257,8 +258,9 @@ public ConversionDefn analyze(MinorType inputType, ColumnMetadata outputSchema)
         case BIGINT:
         case FLOAT4:
         case FLOAT8:
-        case VARDECIMAL:
           return IMPLICIT;
+        case VARDECIMAL:
+          return new ConversionDefn(ConvertIntToDecimal.class);

Review comment:
       If you decided to leave `inputSchema` naming, pls consider to rename `ConversionDefn` to `ConversionShema` too to follow the common naming.

##########
File path: contrib/storage-elasticsearch/src/main/java/org/apache/drill/exec/store/elasticsearch/plan/ElasticPlanTransformer.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.elasticsearch.plan;
+
+import org.apache.calcite.adapter.elasticsearch.CalciteUtils;
+import org.apache.calcite.adapter.elasticsearch.DrillElasticsearchTableScan;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchAggregate;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchFilter;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchProject;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchSort;
+import org.apache.calcite.adapter.elasticsearch.ElasticsearchTable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rel.type.StructKind;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Implementation of RelShuttleImpl that transforms plan to fit Calcite ElasticSearch rel implementor.
+ */
+public class ElasticPlanTransformer extends RelShuttleImpl {
+
+  private boolean hasProject = false;
+
+  private RelDataTypeField mapField;
+
+  /**
+   * Replaces rowType of RelOptTable by rowType obtained from ElasticsearchTable.
+   */
+  @Override
+  public RelNode visit(TableScan other) {
+    RelOptTableImpl table = (RelOptTableImpl) other.getTable();
+    ElasticsearchTable elasticsearchTable = Objects.requireNonNull(
+        table.unwrap(ElasticsearchTable.class), "ElasticSearch table cannot be null");
+    RelDataType rowType = elasticsearchTable.getRowType(other.getCluster().getTypeFactory());
+    mapField = rowType.getFieldList().get(0);
+    return new DrillElasticsearchTableScan(other.getCluster(), other.getTraitSet(), table.copy(rowType), elasticsearchTable, rowType);
+  }
+
+  @Override
+  public RelNode visit(RelNode other) {
+    // replaces project expressions with ITEM calls, since Calcite returns results as a map
+    if (other instanceof ElasticsearchProject) {
+      ElasticsearchProject project = (ElasticsearchProject) other;
+      RelNode input = project.getInput().accept(this);
+      List<RexNode> convertedExpressions = project.getProjects();
+      // project closest to the scan should be rewritten only
+      if (!this.hasProject) {
+        ElasticExpressionMapper expressionMapper =
+            new ElasticExpressionMapper(project.getCluster().getRexBuilder(),
+                project.getInput().getRowType(), mapField);
+        convertedExpressions = convertedExpressions.stream()
+            .map(expression -> expression.accept(expressionMapper))
+            .collect(Collectors.toList());
+
+        RelRecordType relDataType = getRelRecordType(other.getRowType());
+        this.hasProject = true;
+        return CalciteUtils.createProject(project.getTraitSet(), input,
+            convertedExpressions, relDataType);
+      } else {
+        return input;
+      }
+    } else if (other instanceof ElasticsearchFilter) {
+      ElasticsearchFilter filter = (ElasticsearchFilter) other;
+      RexNode convertedCondition = filter.getCondition().accept(
+          new ElasticExpressionMapper(other.getCluster().getRexBuilder(), filter.getInput().getRowType(), mapField));
+      return filter.copy(other.getTraitSet(), filter.getInput().accept(this), convertedCondition);
+    } else if (other instanceof ElasticsearchSort) {
+      ElasticsearchSort sort = (ElasticsearchSort) other;
+      RelNode input = getMappedInput(sort.getInput());
+      return sort.copy(other.getTraitSet(), input, sort.getCollation(), sort.offset, sort.fetch);
+    } else if (other instanceof ElasticsearchAggregate) {
+      ElasticsearchAggregate aggregate = (ElasticsearchAggregate) other;
+      RelNode input = getMappedInput(aggregate.getInput());
+      return aggregate.copy(other.getTraitSet(), input, aggregate.getGroupSet(),
+          aggregate.getGroupSets(), aggregate.getAggCallList());
+    }
+
+    return super.visit(other);
+  }
+
+  /**
+   * Generates project with mapped expressions above specified rel node
+   * if there is no other project in the tree.

Review comment:
       The final tree from Calcite can be without Project?

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/SubsetRemover.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttle;
+import org.apache.calcite.rel.RelShuttleImpl;
+
+public class SubsetRemover extends RelShuttleImpl {

Review comment:
       I see it is factored out from JdbcPrel. Could you add a short javadoc please?

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DrillDataContext.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.enumerable;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.schema.SchemaPlus;
+
+import java.util.Map;
+
+public class DrillDataContext implements DataContext {
+  private final SchemaPlus rootSchema;

Review comment:
       Why not use `DataContextImpl` with `CalciteSchema` schema wrapper?
   Is it due to `DataContextImpl` limited to JDBC only in the Calicte?

##########
File path: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/convert/TestDirectConverter.java
##########
@@ -708,7 +708,7 @@ public void testBasicConversionType() {
     expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, bigIntCol));
     expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, float4Col));
     expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, float8Col));
-    expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, decimalCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(tinyIntCol, decimalCol));

Review comment:
       Why it is changed?

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/VertexDrel.java
##########
@@ -27,19 +25,21 @@
 import org.apache.drill.exec.planner.logical.DrillImplementor;
 import org.apache.drill.exec.planner.logical.DrillRel;
 
-public class JdbcDrel extends SingleRel implements DrillRel {
+import java.util.List;
+
+public class VertexDrel extends SingleRel implements DrillRel {

Review comment:
       Could you describe what Vertex Drel means?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org