You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2018/06/22 21:14:40 UTC

[drill] 09/09: DRILL-6454: Native MapR DB plugin support for Hive MapR-DB json table

This is an automated email from the ASF dual-hosted git repository.

vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit b92f5996eeb9bc41c23c0faddb65b248c7cc8c4f
Author: Vitalii Diravka <vi...@gmail.com>
AuthorDate: Thu May 31 20:39:44 2018 -0700

    DRILL-6454: Native MapR DB plugin support for Hive MapR-DB json table
    
    closes #1314
---
 .../drill/exec/store/mapr/TableFormatPlugin.java   |  11 +-
 .../exec/store/mapr/db/MapRDBFormatMatcher.java    |  18 +-
 .../drill/exec/store/mapr/db/MapRDBGroupScan.java  |   8 +-
 .../store/mapr/db/MapRDBPushFilterIntoScan.java    |   2 +-
 .../store/mapr/db/binary/BinaryTableGroupScan.java |   8 +-
 .../store/mapr/db/json/JsonTableGroupScan.java     |   6 +-
 contrib/storage-hive/core/pom.xml                  |   5 +
 ...ertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java | 186 +++++++++++++++++++++
 .../ConvertHiveParquetScanToDrillParquetScan.java  | 116 +------------
 .../exec/store/hive/HiveMetadataProvider.java      |   2 +-
 .../drill/exec/store/hive/HiveStoragePlugin.java   |  16 +-
 .../drill/exec/store/hive/HiveTableWrapper.java    |   7 +-
 .../drill/exec/store/hive/HiveUtilities.java       | 149 ++++++++++++++---
 .../exec/TestHiveDrillNativeParquetReader.java     |   4 +-
 .../java/org/apache/drill/exec/ExecConstants.java  |   8 +
 .../exec/server/options/SystemOptionManager.java   |   2 +
 .../java-exec/src/main/resources/drill-module.conf |   2 +
 17 files changed, 378 insertions(+), 172 deletions(-)

diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java
index 0d983bd..be86b4c 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java
@@ -31,6 +31,7 @@ import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.FileSystemPlugin;
@@ -45,20 +46,20 @@ public abstract class TableFormatPlugin implements FormatPlugin {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
       .getLogger(TableFormatPlugin.class);
 
-  private final FileSystemConfig storageConfig;
+  private final StoragePluginConfig storageConfig;
   private final TableFormatPluginConfig config;
   private final Configuration fsConf;
   private final DrillbitContext context;
   private final String name;
 
-  private volatile FileSystemPlugin storagePlugin;
+  private volatile AbstractStoragePlugin storagePlugin;
   private final MapRFileSystem maprfs;
 
   protected TableFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
       StoragePluginConfig storageConfig, TableFormatPluginConfig formatConfig) {
     this.context = context;
     this.config = formatConfig;
-    this.storageConfig = (FileSystemConfig) storageConfig;
+    this.storageConfig = storageConfig;
     this.fsConf = fsConf;
     this.name = name == null ? "maprdb" : name;
     try {
@@ -119,10 +120,10 @@ public abstract class TableFormatPlugin implements FormatPlugin {
     return name;
   }
 
-  public synchronized FileSystemPlugin getStoragePlugin() {
+  public synchronized AbstractStoragePlugin getStoragePlugin() {
     if (this.storagePlugin == null) {
       try {
-        this.storagePlugin = (FileSystemPlugin) (context.getStorage().getPlugin(storageConfig));
+        this.storagePlugin = (AbstractStoragePlugin) context.getStorage().getPlugin(storageConfig);
       } catch (ExecutionSetupException e) {
         throw new RuntimeException(e);
       }
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
index 4ce6b7d..d4978b9 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
@@ -18,8 +18,8 @@
 package org.apache.drill.exec.store.mapr.db;
 
 import java.io.IOException;
-import java.util.List;
 
+import com.mapr.fs.MapRFileStatus;
 import com.mapr.fs.tables.TableProperties;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
@@ -31,7 +31,6 @@ import org.apache.drill.exec.store.dfs.FormatSelection;
 import org.apache.drill.exec.store.mapr.TableFormatMatcher;
 import org.apache.drill.exec.store.mapr.TableFormatPlugin;
 
-import com.mapr.fs.MapRFileStatus;
 import org.apache.drill.exec.store.mapr.db.binary.MapRDBBinaryTable;
 import org.apache.hadoop.fs.Path;
 
@@ -54,19 +53,16 @@ public class MapRDBFormatMatcher extends TableFormatMatcher {
   public DrillTable isReadable(DrillFileSystem fs,
                                FileSelection selection, FileSystemPlugin fsPlugin,
                                String storageEngineName, SchemaConfig schemaConfig) throws IOException {
-
     if (isFileReadable(fs, selection.getFirstPath(fs))) {
-      List<String> files = selection.getFiles();
-      assert (files.size() == 1);
-      String tableName = files.get(0);
-      TableProperties props = getFormatPlugin().getMaprFS().getTableProperties(new Path(tableName));
-
+      MapRDBFormatPlugin mapRDBFormatPlugin = (MapRDBFormatPlugin) getFormatPlugin();
+      String tableName = mapRDBFormatPlugin.getTableName(selection);
+      TableProperties props = mapRDBFormatPlugin.getMaprFS().getTableProperties(new Path(tableName));
       if (props.getAttr().getJson()) {
         return new DynamicDrillTable(fsPlugin, storageEngineName, schemaConfig.getUserName(),
-            new FormatSelection(getFormatPlugin().getConfig(), selection));
+            new FormatSelection(mapRDBFormatPlugin.getConfig(), selection));
       } else {
-        FormatSelection formatSelection = new FormatSelection(getFormatPlugin().getConfig(), selection);
-        return new MapRDBBinaryTable(storageEngineName, fsPlugin, (MapRDBFormatPlugin) getFormatPlugin(), formatSelection);
+        FormatSelection formatSelection = new FormatSelection(mapRDBFormatPlugin.getConfig(), selection);
+        return new MapRDBBinaryTable(storageEngineName, fsPlugin, mapRDBFormatPlugin, formatSelection);
       }
     }
     return null;
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
index effdcde..927bd70 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
@@ -35,8 +35,8 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -49,7 +49,7 @@ import com.google.common.collect.Sets;
 public abstract class MapRDBGroupScan extends AbstractGroupScan {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBGroupScan.class);
 
-  protected FileSystemPlugin storagePlugin;
+  protected AbstractStoragePlugin storagePlugin;
 
   protected MapRDBFormatPlugin formatPlugin;
 
@@ -84,7 +84,7 @@ public abstract class MapRDBGroupScan extends AbstractGroupScan {
     this.filterPushedDown = that.filterPushedDown;
   }
 
-  public MapRDBGroupScan(FileSystemPlugin storagePlugin,
+  public MapRDBGroupScan(AbstractStoragePlugin storagePlugin,
       MapRDBFormatPlugin formatPlugin, List<SchemaPath> columns, String userName) {
     super(userName);
     this.storagePlugin = storagePlugin;
@@ -254,7 +254,7 @@ public abstract class MapRDBGroupScan extends AbstractGroupScan {
   }
 
   @JsonIgnore
-  public FileSystemPlugin getStoragePlugin(){
+  public AbstractStoragePlugin getStoragePlugin(){
     return storagePlugin;
   }
 
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
index 8655f5b..e2ba238 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
@@ -144,7 +144,7 @@ public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRul
     final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());
 
     // Depending on whether is a project in the middle, assign either scan or copy of project to childRel.
-    final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));;
+    final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));
 
     if (jsonConditionBuilder.isAllExpressionsConverted()) {
         /*
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
index 16b979e..216f05e 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
@@ -30,9 +30,9 @@ import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
 import org.apache.drill.exec.store.hbase.HBaseScanSpec;
 import org.apache.drill.exec.store.hbase.HBaseUtils;
@@ -78,19 +78,19 @@ public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseC
                               @JsonProperty("columns") List<SchemaPath> columns,
                               @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
     this (userName,
-          (FileSystemPlugin) pluginRegistry.getPlugin(storagePluginConfig),
+          (AbstractStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig),
           (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig),
           scanSpec, columns);
   }
 
-  public BinaryTableGroupScan(String userName, FileSystemPlugin storagePlugin,
+  public BinaryTableGroupScan(String userName, AbstractStoragePlugin storagePlugin,
       MapRDBFormatPlugin formatPlugin, HBaseScanSpec scanSpec, List<SchemaPath> columns) {
     super(storagePlugin, formatPlugin, columns, userName);
     this.hbaseScanSpec = scanSpec;
     init();
   }
 
-  public BinaryTableGroupScan(String userName, FileSystemPlugin storagePlugin,
+  public BinaryTableGroupScan(String userName, AbstractStoragePlugin storagePlugin,
                               MapRDBFormatPlugin formatPlugin, HBaseScanSpec scanSpec,
                               List<SchemaPath> columns, MapRDBTableStats tableStats) {
     super(storagePlugin, formatPlugin, columns, userName);
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
index a1d7f9a..975f1b8 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
@@ -30,9 +30,9 @@ import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
 import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig;
 import org.apache.drill.exec.store.mapr.db.MapRDBGroupScan;
@@ -71,12 +71,12 @@ public class JsonTableGroupScan extends MapRDBGroupScan {
                             @JsonProperty("columns") List<SchemaPath> columns,
                             @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
     this (userName,
-          (FileSystemPlugin) pluginRegistry.getPlugin(storagePluginConfig),
+          (AbstractStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig),
           (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig),
           scanSpec, columns);
   }
 
-  public JsonTableGroupScan(String userName, FileSystemPlugin storagePlugin,
+  public JsonTableGroupScan(String userName, AbstractStoragePlugin storagePlugin,
                             MapRDBFormatPlugin formatPlugin, JsonScanSpec scanSpec, List<SchemaPath> columns) {
     super(storagePlugin, formatPlugin, columns, userName);
     this.scanSpec = scanSpec;
diff --git a/contrib/storage-hive/core/pom.xml b/contrib/storage-hive/core/pom.xml
index 0f7ff1f..e0dbb5a 100644
--- a/contrib/storage-hive/core/pom.xml
+++ b/contrib/storage-hive/core/pom.xml
@@ -142,6 +142,11 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.drill.contrib</groupId>
+      <artifactId>drill-format-mapr</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
new file mode 100644
index 0000000..50fee9c
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.hive.HiveMetadataProvider;
+import org.apache.drill.exec.store.hive.HiveReadEntry;
+import org.apache.drill.exec.store.hive.HiveScan;
+import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
+import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig;
+import org.apache.drill.exec.store.mapr.db.json.JsonScanSpec;
+import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan;
+import org.ojai.DocumentConstants;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.drill.exec.store.hive.HiveUtilities.nativeReadersRuleMatches;
+
+/**
+ * Convert Hive scan to use Drill's native MapR-DB reader instead of Hive's MapR-DB JSON Handler.
+ */
+public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePluginOptimizerRule {
+  private static final org.slf4j.Logger logger =
+      org.slf4j.LoggerFactory.getLogger(ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.class);
+
+  public static final ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan INSTANCE =
+      new ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan();
+
+  /**
+   * The constants from org.apache.hadoop.hive.maprdb.json.conf.MapRDBConstants
+   */
+  private static final String MAPRDB_PFX = "maprdb.";
+  private static final String MAPRDB_TABLE_NAME = MAPRDB_PFX + "table.name";
+  private static final String ID_KEY = DocumentConstants.ID_KEY;
+  private static final String MAPRDB_COLUMN_ID = MAPRDB_PFX + "column.id";
+
+  private ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan() {
+    super(RelOptHelper.any(DrillScanRel.class), "ConvertHiveScanToHiveDrillNativeScan:MapR-DB");
+  }
+
+  /**
+   * {@see org.apache.drill.exec.store.hive.HiveUtilities#nativeReadersRuleMatches}
+   */
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    try {
+      return nativeReadersRuleMatches(call,
+          Class.forName("org.apache.hadoop.hive.maprdb.json.input.HiveMapRDBJsonInputFormat"));
+    } catch (ClassNotFoundException e) {
+      throw UserException.resourceError(e)
+          .message("Current Drill build is not designed for working with Hive MapR-DB tables. " +
+              "Please disable \"%s\" option", ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER)
+          .build(logger);
+    }
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    try {
+      DrillScanRel hiveScanRel = call.rel(0);
+
+      HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
+      HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
+      HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry,
+          hiveScan.getStoragePlugin().getHiveConf());
+      if (hiveMetadataProvider.getInputSplits(hiveReadEntry).isEmpty()) {
+        // table is empty, use original scan
+        return;
+      }
+
+      if (hiveScan.getHiveReadEntry().getTable().isSetPartitionKeys()) {
+        logger.warn("Hive MapR-DB JSON Handler doesn't support table partitioning. Consider recreating table without " +
+            "partitions");
+      }
+
+      DrillScanRel nativeScanRel = createNativeScanRel(hiveScanRel);
+      call.transformTo(nativeScanRel);
+
+      /*
+        Drill native scan should take precedence over Hive since it's more efficient and faster.
+        Hive does not always give correct costing (i.e. for external tables Hive does not have number of rows
+        and we calculate them approximately). On the contrary, Drill calculates number of rows exactly
+        and thus Hive Scan can be chosen instead of Drill native scan because costings allegedly lower for Hive.
+        To ensure Drill MapR-DB Json scan will be chosen, reduce Hive scan importance to 0.
+       */
+      call.getPlanner().setImportance(hiveScanRel, 0.0);
+    } catch (final Exception e) {
+      logger.warn("Failed to convert HiveScan to JsonScanSpec", e);
+    }
+  }
+
+  /**
+   * Helper method which creates a DrillScanRel with native Drill HiveScan.
+   */
+  private DrillScanRel createNativeScanRel(final DrillScanRel hiveScanRel) throws Exception {
+    RelDataTypeFactory typeFactory = hiveScanRel.getCluster().getTypeFactory();
+    HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
+    Map<String, String> parameters = hiveScan.getHiveReadEntry().getHiveTableWrapper().getParameters();
+
+    JsonScanSpec scanSpec = new JsonScanSpec(parameters.get(MAPRDB_TABLE_NAME), null);
+    MapRDBFormatPlugin mapRDBFormatPlugin = new MapRDBFormatPlugin(
+        "hive-maprdb",
+        hiveScan.getStoragePlugin().getContext(),
+        hiveScan.getHiveConf(),
+        hiveScan.getStoragePlugin().getConfig(),
+        new MapRDBFormatPluginConfig()
+    );
+    List<SchemaPath> hiveScanCols = hiveScanRel.getColumns().stream()
+        .map(colNameSchemaPath -> replaceOverriddenSchemaPath(parameters, colNameSchemaPath))
+        .collect(Collectors.toList());
+    JsonTableGroupScan nariveMapRDBScan =
+        new JsonTableGroupScan(
+            hiveScan.getUserName(),
+            hiveScan.getStoragePlugin(),
+            mapRDBFormatPlugin,
+            scanSpec,
+            hiveScanCols
+        );
+
+    List<String> nativeScanColNames = hiveScanRel.getRowType().getFieldList().stream()
+        .map(field -> replaceOverriddenColumnId(parameters, field.getName()))
+        .collect(Collectors.toList());
+    List<RelDataType> nativeScanColTypes = hiveScanRel.getRowType().getFieldList().stream()
+        .map(RelDataTypeField::getType)
+        .collect(Collectors.toList());
+    RelDataType nativeScanRowType = typeFactory.createStructType(nativeScanColTypes, nativeScanColNames);
+
+    return new DrillScanRel(
+        hiveScanRel.getCluster(),
+        hiveScanRel.getTraitSet(),
+        hiveScanRel.getTable(),
+        nariveMapRDBScan,
+        nativeScanRowType,
+        hiveScanCols);
+  }
+
+  /**
+   * Hive maps column id "_id" with custom user column id name. Replace it for {@link DrillScanRel}
+   *
+   * @param parameters Hive table properties
+   * @param colName Hive column name
+   * @return original column name, null if colName is absent
+   */
+  private String replaceOverriddenColumnId(Map<String, String> parameters, String colName) {
+    return colName != null && colName.equals(parameters.get(MAPRDB_COLUMN_ID)) ? ID_KEY : colName;
+  }
+
+  /**
+   * The same as above, but for {@link SchemaPath} object
+   *
+   * @param parameters Hive table properties
+   * @param colNameSchemaPath SchemaPath with Hive column name
+   * @return SchemaPath with original column name
+   */
+  private SchemaPath replaceOverriddenSchemaPath(Map<String, String> parameters, SchemaPath colNameSchemaPath) {
+    String hiveColumnName = colNameSchemaPath.getRootSegmentPath();
+    return hiveColumnName != null && hiveColumnName.equals(parameters.get(MAPRDB_COLUMN_ID))
+        ? SchemaPath.getSimplePath(ID_KEY) : colNameSchemaPath;
+  }
+}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
index 3484ab3..2a2f4fb 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.planner.sql.logical;
 
-import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.calcite.plan.RelOptRuleCall;
@@ -40,22 +39,14 @@ import org.apache.drill.exec.store.hive.HiveDrillNativeParquetScan;
 import org.apache.drill.exec.store.hive.HiveMetadataProvider;
 import org.apache.drill.exec.store.hive.HiveReadEntry;
 import org.apache.drill.exec.store.hive.HiveScan;
-import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
-import org.apache.drill.exec.store.hive.HiveTableWrapper.HivePartitionWrapper;
-import org.apache.drill.exec.store.hive.HiveUtilities;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
+
+import static org.apache.drill.exec.store.hive.HiveUtilities.nativeReadersRuleMatches;
 
 /**
  * Convert Hive scan to use Drill's native parquet reader instead of Hive's native reader. It also adds a
@@ -78,94 +69,11 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
   }
 
   /**
-   * Rule is matched when all of the following match:
-   * 1) GroupScan in given DrillScalRel is an {@link HiveScan}
-   * 2) {@link HiveScan} is not already rewritten using Drill's native readers
-   * 3) InputFormat in Hive table metadata and all partitions metadata contains the same value
-   *    {@link MapredParquetInputFormat}
-   * 4) No error occurred while checking for the above conditions. An error is logged as warning.
-   *
-   * @param call rule call
-   * @return True if the rule can be applied. False otherwise
+   * {@see org.apache.drill.exec.store.hive.HiveUtilities#nativeReadersRuleMatches}
    */
   @Override
   public boolean matches(RelOptRuleCall call) {
-    final DrillScanRel scanRel = call.rel(0);
-
-    if (!(scanRel.getGroupScan() instanceof HiveScan) || ((HiveScan) scanRel.getGroupScan()).isNativeReader()) {
-      return false;
-    }
-
-    final HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
-    final HiveConf hiveConf = hiveScan.getHiveConf();
-    final HiveTableWithColumnCache hiveTable = hiveScan.getHiveReadEntry().getTable();
-
-    if (containsUnsupportedDataTypes(hiveTable)) {
-      return false;
-    }
-
-    final Class<? extends InputFormat<?,?>> tableInputFormat =
-        getInputFormatFromSD(HiveUtilities.getTableMetadata(hiveTable), hiveScan.getHiveReadEntry(), hiveTable.getSd(),
-            hiveConf);
-    if (tableInputFormat == null || !tableInputFormat.equals(MapredParquetInputFormat.class)) {
-      return false;
-    }
-
-    final List<HivePartitionWrapper> partitions = hiveScan.getHiveReadEntry().getHivePartitionWrappers();
-    if (partitions == null) {
-      return true;
-    }
-
-    final List<FieldSchema> tableSchema = hiveTable.getSd().getCols();
-    // Make sure all partitions have the same input format as the table input format
-    for (HivePartitionWrapper partition : partitions) {
-      final StorageDescriptor partitionSD = partition.getPartition().getSd();
-      Class<? extends InputFormat<?, ?>> inputFormat = getInputFormatFromSD(
-          HiveUtilities.getPartitionMetadata(partition.getPartition(), hiveTable), hiveScan.getHiveReadEntry(), partitionSD,
-          hiveConf);
-      if (inputFormat == null || !inputFormat.equals(tableInputFormat)) {
-        return false;
-      }
-
-      // Make sure the schema of the table and schema of the partition matches. If not return false. Schema changes
-      // between table and partition can happen when table schema is altered using ALTER statements after some
-      // partitions are already created. Currently native reader conversion doesn't handle schema changes between
-      // partition and table. Hive has extensive list of convert methods to convert from one type to rest of the
-      // possible types. Drill doesn't have the similar set of methods yet.
-      if (!partitionSD.getCols().equals(tableSchema)) {
-        logger.debug("Partitions schema is different from table schema. Currently native reader conversion can't " +
-            "handle schema difference between partitions and table");
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  /**
-   * Get the input format from given {@link StorageDescriptor}
-   * @param properties table properties
-   * @param hiveReadEntry hive read entry
-   * @param sd storage descriptor
-   * @return {@link InputFormat} class or null if a failure has occurred. Failure is logged as warning.
-   */
-  private Class<? extends InputFormat<?, ?>> getInputFormatFromSD(final Properties properties,
-      final HiveReadEntry hiveReadEntry, final StorageDescriptor sd, final HiveConf hiveConf) {
-    final Table hiveTable = hiveReadEntry.getTable();
-    try {
-      final String inputFormatName = sd.getInputFormat();
-      if (!Strings.isNullOrEmpty(inputFormatName)) {
-        return (Class<? extends InputFormat<?, ?>>) Class.forName(inputFormatName);
-      }
-
-      final JobConf job = new JobConf(hiveConf);
-      HiveUtilities.addConfToJob(job, properties);
-      return HiveUtilities.getInputFormatClass(job, sd, hiveTable);
-    } catch (final Exception e) {
-      logger.warn("Failed to get InputFormat class from Hive table '{}.{}'. StorageDescriptor [{}]",
-          hiveTable.getDbName(), hiveTable.getTableName(), sd.toString(), e);
-      return null;
-    }
+    return nativeReadersRuleMatches(call, MapredParquetInputFormat.class);
   }
 
   @Override
@@ -203,7 +111,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
         Hive does not always give correct costing (i.e. for external tables Hive does not have number of rows
         and we calculate them approximately). On the contrary, Drill calculates number of rows exactly
         and thus Hive Scan can be chosen instead of Drill native scan because costings allegedly lower for Hive.
-        To ensure Drill native scan we'll be chosen, reduce Hive scan importance to 0.
+        To ensure Drill native scan will be chosen, reduce Hive scan importance to 0.
        */
       call.getPlanner().setImportance(hiveScanRel, 0.0);
     } catch (final Exception e) {
@@ -341,18 +249,4 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
 
     return rb.makeCast(outputType, inputRef);
   }
-
-  private boolean containsUnsupportedDataTypes(final Table hiveTable) {
-    for (FieldSchema hiveField : hiveTable.getSd().getCols()) {
-      final Category category = TypeInfoUtils.getTypeInfoFromTypeString(hiveField.getType()).getCategory();
-      if (category == Category.MAP ||
-          category == Category.STRUCT ||
-          category == Category.UNION ||
-          category == Category.LIST) {
-        logger.debug("Hive table contains unsupported data type: {}", category);
-        return true;
-      }
-    }
-    return false;
-  }
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
index c877564..6da6c40 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
@@ -375,7 +375,7 @@ public class HiveMetadataProvider {
     }
 
     /**
-     * @return collection of unique locations where inout splits are stored
+     * @return collection of unique locations where input splits are stored
      */
     public Collection<String> getLocations() throws IOException {
       Set<String> locations = new HashSet<>();
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index 449a6f9..ced8b01 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -34,9 +34,11 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.planner.sql.logical.ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan;
 import org.apache.drill.exec.planner.sql.logical.ConvertHiveParquetScanToDrillParquetScan;
 import org.apache.drill.exec.planner.sql.logical.HivePushPartitionFilterIntoScan;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
@@ -165,14 +167,18 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
 
   @Override
   public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
+    ImmutableSet.Builder<StoragePluginOptimizerRule> ruleBuilder = ImmutableSet.builder();
+    OptionManager options = optimizerRulesContext.getPlannerSettings().getOptions();
     // TODO: Remove implicit using of convert_fromTIMESTAMP_IMPALA function
     // once "store.parquet.reader.int96_as_timestamp" will be true by default
-    if(optimizerRulesContext.getPlannerSettings().getOptions()
-        .getOption(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS).bool_val) {
-      return ImmutableSet.<StoragePluginOptimizerRule>of(ConvertHiveParquetScanToDrillParquetScan.INSTANCE);
+    if (options.getBoolean(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS) ||
+        options.getBoolean(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER)) {
+      ruleBuilder.add(ConvertHiveParquetScanToDrillParquetScan.INSTANCE);
     }
-
-    return ImmutableSet.of();
+    if (options.getBoolean(ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER)) {
+      ruleBuilder.add(ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.INSTANCE);
+    }
+    return ruleBuilder.build();
   }
 
   private static HiveConf createHiveConf(final Map<String, String> hiveConfigOverride) {
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWrapper.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWrapper.java
index f5ebc5d..e6178b2 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWrapper.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWrapper.java
@@ -55,7 +55,7 @@ public class HiveTableWrapper {
   @JsonProperty
   public List<FieldSchemaWrapper> partitionKeys;
   @JsonProperty
-  public Map<String,String> parameters;
+  public Map<String, String> parameters;
   @JsonProperty
   public String viewOriginalText;
   @JsonProperty
@@ -129,6 +129,11 @@ public class HiveTableWrapper {
     return table;
   }
 
+  @JsonIgnore
+  public Map<String, String> getParameters() {
+    return parameters;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder("Table(");
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
index 5279f28..c8efb65 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
@@ -17,12 +17,10 @@
  */
 package org.apache.drill.exec.store.hive;
 
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
 import io.netty.buffer.DrillBuf;
+import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
@@ -34,6 +32,7 @@ import org.apache.drill.exec.expr.holders.Decimal18Holder;
 import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
 import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
 import org.apache.drill.exec.expr.holders.Decimal9Holder;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.util.DecimalUtility;
@@ -60,6 +59,7 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
@@ -73,18 +73,20 @@ import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
-import javax.annotation.Nullable;
 import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
 
@@ -528,7 +530,7 @@ public class HiveUtilities {
   }
 
   /**
-   * This method checks whether the table is transactional and set necessary properties in {@link JobConf}.
+   * This method checks whether the table is transactional and set necessary properties in {@link JobConf}.<br>
    * If schema evolution properties aren't set in job conf for the input format, method sets the column names
    * and types from table/partition properties or storage descriptor.
    *
@@ -555,30 +557,129 @@ public class HiveUtilities {
       colTypes = job.get(serdeConstants.LIST_COLUMN_TYPES);
 
       if (colNames == null || colTypes == null) {
-        colNames = Joiner.on(",").join(Lists.transform(sd.getCols(), new Function<FieldSchema, String>()
-        {
-          @Nullable
-          @Override
-          public String apply(@Nullable FieldSchema input)
-          {
-            return input.getName();
-          }
-        }));
-
-        colTypes = Joiner.on(",").join(Lists.transform(sd.getCols(), new Function<FieldSchema, String>()
-        {
-          @Nullable
-          @Override
-          public String apply(@Nullable FieldSchema input)
-          {
-            return input.getType();
-          }
-        }));
+        colNames = sd.getCols().stream()
+            .map(FieldSchema::getName)
+            .collect(Collectors.joining(","));
+        colTypes = sd.getCols().stream()
+            .map(FieldSchema::getType)
+            .collect(Collectors.joining(","));
       }
 
       job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, colNames);
       job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, colTypes);
     }
   }
+
+  /**
+   * Rule is matched when all of the following match:
+   * <ul>
+   * <li>GroupScan in given DrillScalRel is an {@link HiveScan}</li>
+   * <li> {@link HiveScan} is not already rewritten using Drill's native readers</li>
+   * <li> InputFormat in table metadata and all partitions metadata contains the same value {@param tableInputFormatClass}</li>
+   * <li> No error occurred while checking for the above conditions. An error is logged as warning.</li>
+   *</ul>
+   * @param call rule call
+   * @return True if the rule can be applied. False otherwise
+   */
+  public static boolean nativeReadersRuleMatches(RelOptRuleCall call, Class tableInputFormatClass) {
+    final DrillScanRel scanRel = call.rel(0);
+
+    if (!(scanRel.getGroupScan() instanceof HiveScan) || ((HiveScan) scanRel.getGroupScan()).isNativeReader()) {
+      return false;
+    }
+
+    final HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
+    final HiveConf hiveConf = hiveScan.getHiveConf();
+    final HiveTableWithColumnCache hiveTable = hiveScan.getHiveReadEntry().getTable();
+
+    if (HiveUtilities.containsUnsupportedDataTypes(hiveTable)) {
+      return false;
+    }
+
+    final Class<? extends InputFormat<?, ?>> tableInputFormat = getInputFormatFromSD(
+        HiveUtilities.getTableMetadata(hiveTable), hiveScan.getHiveReadEntry(), hiveTable.getSd(), hiveConf);
+    if (tableInputFormat == null || !tableInputFormat.equals(tableInputFormatClass)) {
+      return false;
+    }
+
+    final List<HiveTableWrapper.HivePartitionWrapper> partitions = hiveScan.getHiveReadEntry().getHivePartitionWrappers();
+    if (partitions == null) {
+      return true;
+    }
+
+    final List<FieldSchema> tableSchema = hiveTable.getSd().getCols();
+    // Make sure all partitions have the same input format as the table input format
+    for (HiveTableWrapper.HivePartitionWrapper partition : partitions) {
+      final StorageDescriptor partitionSD = partition.getPartition().getSd();
+      Class<? extends InputFormat<?, ?>> inputFormat = getInputFormatFromSD(HiveUtilities.getPartitionMetadata(
+          partition.getPartition(), hiveTable), hiveScan.getHiveReadEntry(), partitionSD, hiveConf);
+      if (inputFormat == null || !inputFormat.equals(tableInputFormat)) {
+        return false;
+      }
+
+      // Make sure the schema of the table and schema of the partition matches. If not return false. Schema changes
+      // between table and partition can happen when table schema is altered using ALTER statements after some
+      // partitions are already created. Currently native reader conversion doesn't handle schema changes between
+      // partition and table. Hive has extensive list of convert methods to convert from one type to rest of the
+      // possible types. Drill doesn't have the similar set of methods yet.
+      if (!partitionSD.getCols().equals(tableSchema)) {
+        logger.debug("Partitions schema is different from table schema. Currently native reader conversion can't " +
+            "handle schema difference between partitions and table");
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Get the input format from given {@link StorageDescriptor}
+   *
+   * @param properties table properties
+   * @param hiveReadEntry hive read entry
+   * @param sd storage descriptor
+   * @return {@link InputFormat} class or null if a failure has occurred. Failure is logged as warning.
+   */
+  private static Class<? extends InputFormat<?, ?>> getInputFormatFromSD(final Properties properties,
+                                                                  final HiveReadEntry hiveReadEntry, final StorageDescriptor sd, final HiveConf hiveConf) {
+    final Table hiveTable = hiveReadEntry.getTable();
+    try {
+      final String inputFormatName = sd.getInputFormat();
+      if (!Strings.isNullOrEmpty(inputFormatName)) {
+        return (Class<? extends InputFormat<?, ?>>) Class.forName(inputFormatName);
+      }
+
+      final JobConf job = new JobConf(hiveConf);
+      HiveUtilities.addConfToJob(job, properties);
+      return HiveUtilities.getInputFormatClass(job, sd, hiveTable);
+    } catch (final Exception e) {
+      logger.warn("Failed to get InputFormat class from Hive table '{}.{}'. StorageDescriptor [{}]",
+          hiveTable.getDbName(), hiveTable.getTableName(), sd.toString(), e);
+      return null;
+    }
+  }
+
+  /**
+   * This method allows to check whether the Hive Table contains
+   * <a href="https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types#LanguageManualTypes-ComplexTypes">
+   * Hive Complex Types</a><p>
+   * TODO: Need to implement it, DRILL-3290. Appropriate (new or existed) Drill types should be selected.
+   *
+   * @param hiveTable Thrift table from Hive Metastore
+   * @return true if table contains unsupported data types, false otherwise
+   */
+  public static boolean containsUnsupportedDataTypes(final Table hiveTable) {
+    for (FieldSchema hiveField : hiveTable.getSd().getCols()) {
+      final Category category = TypeInfoUtils.getTypeInfoFromTypeString(hiveField.getType()).getCategory();
+      if (category == Category.MAP ||
+          category == Category.STRUCT ||
+          category == Category.UNION ||
+          category == Category.LIST) {
+        logger.debug("Hive table contains unsupported data type: {}", category);
+        return true;
+      }
+    }
+    return false;
+  }
 }
 
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
index fd9701c..556deb2 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
@@ -42,13 +42,13 @@ public class TestHiveDrillNativeParquetReader extends HiveTestBase {
 
   @BeforeClass
   public static void init() {
-    setSessionOption(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS, true);
+    setSessionOption(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER, true);
     setSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
   }
 
   @AfterClass
   public static void cleanup() {
-    resetSessionOption(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS);
+    resetSessionOption(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER);
     resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 35cc351..634f670 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -387,9 +387,17 @@ public final class ExecConstants {
 
   // TODO: We need to add a feature that enables storage plugins to add their own options. Currently we have to declare
   // in core which is not right. Move this option and above two mongo plugin related options once we have the feature.
+  @Deprecated // TODO: DRILL-6527. It should be removed starting from next Drill 1.15.0 release
   public static final String HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS = "store.hive.optimize_scan_with_native_readers";
+  @Deprecated // TODO: DRILL-6527. It should be removed starting from next Drill 1.15.0 release
   public static final OptionValidator HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS_VALIDATOR =
       new BooleanValidator(HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS);
+  public static final String HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER = "store.hive.parquet.optimize_scan_with_native_reader";
+  public static final OptionValidator HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER_VALIDATOR =
+      new BooleanValidator(HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER);
+  public static final String HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER = "store.hive.maprdb_json.optimize_scan_with_native_reader";
+  public static final OptionValidator HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER_VALIDATOR =
+      new BooleanValidator(HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER);
 
   public static final String SLICE_TARGET = "planner.slice_target";
   public static final long SLICE_TARGET_DEFAULT = 100000l;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index e543605..06b4c57 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -169,6 +169,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.KAFKA_POLL_TIMEOUT_VALIDATOR),
       new OptionDefinition(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR),
       new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS_VALIDATOR),
+      new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER_VALIDATOR),
+      new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER_VALIDATOR),
       new OptionDefinition(ExecConstants.SLICE_TARGET_OPTION),
       new OptionDefinition(ExecConstants.AFFINITY_FACTOR),
       new OptionDefinition(ExecConstants.MAX_WIDTH_GLOBAL),
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 239d556..99caeab 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -547,6 +547,8 @@ drill.exec.options: {
     security.admin.users: "%drill_process_user%",
     store.format: "parquet",
     store.hive.optimize_scan_with_native_readers: false,
+    store.hive.parquet.optimize_scan_with_native_reader: false,
+    store.hive.maprdb_json.optimize_scan_with_native_reader: false,
     store.json.all_text_mode: false,
     store.json.writer.allow_nan_inf: true,
     store.json.reader.allow_nan_inf: true,