You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/09/29 23:35:49 UTC

drill git commit: DRILL-3209: Add support for reading Hive parquet tables using Drill native parquet reader

Repository: drill
Updated Branches:
  refs/heads/master 32631bb97 -> f78ab8418


DRILL-3209: Add support for reading Hive parquet tables using Drill native parquet reader


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f78ab841
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f78ab841
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f78ab841

Branch: refs/heads/master
Commit: f78ab84183e73216b76732f66f87ccf48e2340d3
Parents: 32631bb
Author: vkorukanti <ve...@gmail.com>
Authored: Fri Sep 25 10:52:08 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Tue Sep 29 11:59:46 2015 -0700

----------------------------------------------------------------------
 .../planner/sql/HivePartitionDescriptor.java    |   4 +-
 ...onvertHiveParquetScanToDrillParquetScan.java | 292 +++++++++++++++++
 .../store/hive/HiveDrillNativeParquetScan.java  | 109 +++++++
 .../hive/HiveDrillNativeParquetSubScan.java     |  49 +++
 .../hive/HiveDrillNativeScanBatchCreator.java   | 192 +++++++++++
 .../apache/drill/exec/store/hive/HiveScan.java  |  24 +-
 .../exec/store/hive/HiveStoragePlugin.java      |  16 +-
 .../drill/exec/store/hive/HiveSubScan.java      |  24 +-
 .../exec/store/hive/schema/DrillHiveTable.java  |   2 +-
 .../drill/exec/TestHivePartitionPruning.java    |  20 ++
 .../drill/exec/TestHiveProjectPushDown.java     |  30 +-
 .../apache/drill/exec/hive/TestHiveStorage.java | 317 +++++++++++++------
 .../exec/hive/TestInfoSchemaOnHiveStorage.java  |   1 +
 .../exec/store/hive/HiveTestDataGenerator.java  | 108 ++++++-
 .../org/apache/drill/exec/ExecConstants.java    |   6 +
 .../server/options/SystemOptionManager.java     |   1 +
 16 files changed, 1065 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
index 11c6455..5009bf1 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
@@ -106,8 +106,8 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
     }
 
     HiveReadEntry newReadEntry = new HiveReadEntry(origReadEntry.table, newPartitions, origReadEntry.hiveConfigOverride);
-    HiveScan newScan = new HiveScan(hiveScan.getUserName(), newReadEntry, hiveScan.storagePlugin, hiveScan.columns);
-    return newScan;
+
+    return hiveScan.clone(newReadEntry);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
new file mode 100644
index 0000000..47700c9
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.logical;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.planner.logical.DrillProjectRel;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.sql.DrillSqlOperator;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.hive.HiveDrillNativeParquetScan;
+import org.apache.drill.exec.store.hive.HiveScan;
+import org.apache.drill.exec.store.hive.HiveTable.HivePartition;
+import org.apache.drill.exec.store.hive.HiveUtilities;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.mapred.InputFormat;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Convert Hive scan to use Drill's native parquet reader instead of Hive's native reader. It also adds a
+ * project to convert/cast the output of Drill's native parquet reader to match the expected output of Hive's
+ * native reader.
+ */
+public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptimizerRule {
+  private static final org.slf4j.Logger logger =
+      org.slf4j.LoggerFactory.getLogger(ConvertHiveParquetScanToDrillParquetScan.class);
+
+  public static final ConvertHiveParquetScanToDrillParquetScan INSTANCE = new ConvertHiveParquetScanToDrillParquetScan();
+
+  private static final DrillSqlOperator INT96_TO_TIMESTAMP =
+      new DrillSqlOperator("convert_fromTIMESTAMP_IMPALA", 1, true);
+
+  private ConvertHiveParquetScanToDrillParquetScan() {
+    super(RelOptHelper.any(DrillScanRel.class), "ConvertHiveScanToHiveDrillNativeScan:Parquet");
+  }
+
+  /**
+   * Rule is matched when all of the following match:
+   * 1) GroupScan in given DrillScalRel is an {@link HiveScan}
+   * 2) {@link HiveScan} is not already rewritten using Drill's native readers
+   * 3) InputFormat in Hive table metadata and all partitions metadata contains the same value
+   *    {@link MapredParquetInputFormat}
+   * 4) No error occurred while checking for the above conditions. An error is logged as warning.
+   *
+   * @param call
+   * @return True if the rule can be applied. False otherwise
+   */
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    final DrillScanRel scanRel = (DrillScanRel) call.rel(0);
+    final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+
+    if (!(scanRel.getGroupScan() instanceof HiveScan) || ((HiveScan) scanRel.getGroupScan()).isNativeReader()) {
+      return false;
+    }
+
+    final HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
+    final Table hiveTable = hiveScan.hiveReadEntry.getTable();
+
+    final Class<? extends InputFormat> tableInputFormat = getInputFormatFromSD(hiveTable, hiveTable.getSd());
+    if (tableInputFormat == null || !tableInputFormat.equals(MapredParquetInputFormat.class)) {
+      return false;
+    }
+
+    final List<HivePartition> partitions = hiveScan.hiveReadEntry.getHivePartitionWrappers();
+    if (partitions == null) {
+      return true;
+    }
+
+    // Make sure all partitions have the same input format as the table input format
+    for (HivePartition partition : partitions) {
+      Class<? extends InputFormat> inputFormat = getInputFormatFromSD(hiveTable, partition.getPartition().getSd());
+      if (inputFormat == null || !inputFormat.equals(tableInputFormat)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Get the input format from given {@link StorageDescriptor}
+   * @param hiveTable
+   * @param sd
+   * @return {@link InputFormat} class or null if a failure has occurred. Failure is logged as warning.
+   */
+  private Class<? extends InputFormat> getInputFormatFromSD(final Table hiveTable, final StorageDescriptor sd) {
+    try {
+      return (Class<? extends InputFormat>) Class.forName(sd.getInputFormat());
+    } catch (ReflectiveOperationException e) {
+      logger.warn("Failed to get InputFormat class from Hive table '{}.{}'. StorageDescriptor [{}]",
+          hiveTable.getDbName(), hiveTable.getTableName(), sd.toString(), e);
+      return null;
+    }
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    try {
+      final DrillScanRel hiveScanRel = (DrillScanRel) call.rel(0);
+      final HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
+
+      final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+      final String partitionColumnLabel = settings.getFsPartitionColumnLabel();
+
+      final Table hiveTable = hiveScan.hiveReadEntry.getTable();
+      checkForUnsupportedDataTypes(hiveTable);
+
+      final Map<String, String> partitionColMapping =
+          getPartitionColMapping(hiveTable, partitionColumnLabel);
+
+      final DrillScanRel nativeScanRel = createNativeScanRel(partitionColMapping, hiveScanRel);
+      final DrillProjectRel projectRel = createProjectRel(hiveScanRel, partitionColMapping, nativeScanRel);
+
+      call.transformTo(projectRel);
+    } catch (final Exception e) {
+      logger.warn("Failed to convert HiveScan to HiveDrillNativeParquetScan", e);
+    }
+  }
+
+  /**
+   * Create mapping of Hive partition column to directory column mapping.
+   */
+  private Map<String, String> getPartitionColMapping(final Table hiveTable, final String partitionColumnLabel) {
+    final Map<String, String> partitionColMapping = Maps.newHashMap();
+    int i = 0;
+    for (FieldSchema col : hiveTable.getPartitionKeys()) {
+      partitionColMapping.put(col.getName(), partitionColumnLabel+i);
+      i++;
+    }
+
+    return partitionColMapping;
+  }
+
+  /**
+   * Helper method which creates a DrillScalRel with native HiveScan.
+   */
+  private DrillScanRel createNativeScanRel(final Map<String, String> partitionColMapping,
+      final DrillScanRel hiveScanRel) throws Exception{
+
+    final RelDataTypeFactory typeFactory = hiveScanRel.getCluster().getTypeFactory();
+    final RelDataType varCharType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
+
+    final List<String> nativeScanColNames = Lists.newArrayList();
+    final List<RelDataType> nativeScanColTypes = Lists.newArrayList();
+    for (RelDataTypeField field : hiveScanRel.getRowType().getFieldList()) {
+      final String dirColName = partitionColMapping.get(field.getName());
+      if (dirColName != null) { // partition column
+        nativeScanColNames.add(dirColName);
+        nativeScanColTypes.add(varCharType);
+      } else {
+        nativeScanColNames.add(field.getName());
+        nativeScanColTypes.add(field.getType());
+      }
+    }
+
+    final RelDataType nativeScanRowType = typeFactory.createStructType(nativeScanColTypes, nativeScanColNames);
+
+    // Create the list of projected columns set in HiveScan. The order of this list may not be same as the order of
+    // columns in HiveScan row type. Note: If the HiveScan.getColumn() contains a '*', we just need to add it as it is,
+    // unlike above where we expanded the '*'. HiveScan and related (subscan) can handle '*'.
+    final List<SchemaPath> nativeScanCols = Lists.newArrayList();
+    for(SchemaPath colName : hiveScanRel.getColumns()) {
+      final String partitionCol = partitionColMapping.get(colName.getAsUnescapedPath());
+      if (partitionCol != null) {
+        nativeScanCols.add(SchemaPath.getSimplePath(partitionCol));
+      } else {
+        nativeScanCols.add(colName);
+      }
+    }
+
+    final HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
+    final HiveDrillNativeParquetScan nativeHiveScan =
+        new HiveDrillNativeParquetScan(
+            hiveScan.getUserName(),
+            hiveScan.hiveReadEntry,
+            hiveScan.storagePlugin,
+            nativeScanCols);
+
+    return new DrillScanRel(
+        hiveScanRel.getCluster(),
+        hiveScanRel.getTraitSet(),
+        hiveScanRel.getTable(),
+        nativeHiveScan,
+        nativeScanRowType,
+        nativeScanCols);
+  }
+
+  /**
+   * Create a project that converts the native scan output to expected output of Hive scan.
+   */
+  private DrillProjectRel createProjectRel(final DrillScanRel hiveScanRel,
+      final Map<String, String> partitionColMapping, final DrillScanRel nativeScanRel) {
+
+    final List<RexNode> rexNodes = Lists.newArrayList();
+    final RexBuilder rb = hiveScanRel.getCluster().getRexBuilder();
+    final RelDataType hiveScanRowType = hiveScanRel.getRowType();
+
+    for (String colName : hiveScanRowType.getFieldNames()) {
+      final String dirColName = partitionColMapping.get(colName);
+      if (dirColName != null) {
+        rexNodes.add(createPartitionColumnCast(hiveScanRel, nativeScanRel, colName, dirColName, rb));
+      } else {
+        rexNodes.add(createColumnFormatConversion(hiveScanRel, nativeScanRel, colName, rb));
+      }
+    }
+
+    return DrillProjectRel.create(
+        hiveScanRel.getCluster(), hiveScanRel.getTraitSet(), nativeScanRel, rexNodes,
+        hiveScanRowType /* project rowtype and HiveScanRel rowtype should be the same */);
+  }
+
+  /**
+   * Apply any data format conversion expressions.
+   */
+  private RexNode createColumnFormatConversion(final DrillScanRel hiveScanRel, final DrillScanRel nativeScanRel,
+      final String colName, final RexBuilder rb) {
+
+    final RelDataType outputType = hiveScanRel.getRowType().getField(colName, false, false).getType();
+    final RelDataTypeField inputField = nativeScanRel.getRowType().getField(colName, false, false);
+    final RexInputRef inputRef = rb.makeInputRef(inputField.getType(), inputField.getIndex());
+
+    if (outputType.getSqlTypeName() == SqlTypeName.TIMESTAMP) {
+      // TIMESTAMP is stored as INT96 by Hive in ParquetFormat. Use convert_fromTIMESTAMP_IMPALA UDF to convert
+      // INT96 format data to TIMESTAMP
+      return rb.makeCall(INT96_TO_TIMESTAMP, inputRef);
+    }
+
+    return inputRef;
+  }
+
+  /**
+   * Create a cast for partition column. Partition column is output as "VARCHAR" in native parquet reader. Cast it
+   * appropriate type according the partition type in HiveScan.
+   */
+  private RexNode createPartitionColumnCast(final DrillScanRel hiveScanRel, final DrillScanRel nativeScanRel,
+      final String outputColName, final String dirColName, final RexBuilder rb) {
+
+    final RelDataType outputType = hiveScanRel.getRowType().getField(outputColName, false, false).getType();
+    final RelDataTypeField inputField = nativeScanRel.getRowType().getField(dirColName, false, false);
+    final RexInputRef inputRef =
+        rb.makeInputRef(rb.getTypeFactory().createSqlType(SqlTypeName.VARCHAR), inputField.getIndex());
+
+    return rb.makeCast(outputType, inputRef);
+  }
+
+  private void checkForUnsupportedDataTypes(final Table hiveTable) {
+    for(FieldSchema hiveField : hiveTable.getSd().getCols()) {
+      final Category category = TypeInfoUtils.getTypeInfoFromTypeString(hiveField.getType()).getCategory();
+      if (category == Category.MAP ||
+          category == Category.STRUCT ||
+          category == Category.UNION ||
+          category == Category.LIST) {
+        HiveUtilities.throwUnsupportedHiveDataTypeError(category.toString());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
new file mode 100644
index 0000000..4d495da
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Extension of {@link HiveScan} which support reading Hive tables using Drill's native parquet reader.
+ */
+@JsonTypeName("hive-drill-native-parquet-scan")
+public class HiveDrillNativeParquetScan extends HiveScan {
+
+  @JsonCreator
+  public HiveDrillNativeParquetScan(@JsonProperty("userName") String userName,
+                                    @JsonProperty("hive-table") HiveReadEntry hiveReadEntry,
+                                    @JsonProperty("storage-plugin") String storagePluginName,
+                                    @JsonProperty("columns") List<SchemaPath> columns,
+                                    @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
+    super(userName, hiveReadEntry, storagePluginName, columns, pluginRegistry);
+  }
+
+  public HiveDrillNativeParquetScan(String userName, HiveReadEntry hiveReadEntry, HiveStoragePlugin storagePlugin,
+      List<SchemaPath> columns) throws ExecutionSetupException {
+    super(userName, hiveReadEntry, storagePlugin, columns);
+  }
+
+  public HiveDrillNativeParquetScan(final HiveScan hiveScan) {
+    super(hiveScan);
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+    final ScanStats nativeHiveScanStats = super.getScanStats();
+
+    // As Drill's native parquet record reader is faster and memory efficient divide the costs by a factor.
+    return new ScanStats(
+        nativeHiveScanStats.getGroupScanProperty(),
+        nativeHiveScanStats.getRecordCount()/HIVE_SERDE_SCAN_OVERHEAD_FACTOR,
+        nativeHiveScanStats.getCpuCost()/HIVE_SERDE_SCAN_OVERHEAD_FACTOR,
+        nativeHiveScanStats.getDiskCost()/HIVE_SERDE_SCAN_OVERHEAD_FACTOR);
+  }
+
+  @Override
+  public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
+    try {
+      return new HiveDrillNativeParquetSubScan((HiveSubScan)super.getSpecificScan(minorFragmentId));
+    } catch (IOException | ReflectiveOperationException e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  @Override
+  public boolean isNativeReader() {
+    return true;
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+    return new HiveDrillNativeParquetScan(this);
+  }
+
+  @Override
+  public HiveScan clone(HiveReadEntry hiveReadEntry) throws ExecutionSetupException {
+    return new HiveDrillNativeParquetScan(getUserName(), hiveReadEntry, storagePlugin, columns);
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    final HiveDrillNativeParquetScan scan = new HiveDrillNativeParquetScan(this);
+    scan.columns = columns;
+    return scan;
+  }
+
+  @Override
+  public String toString() {
+    return "HiveDrillNativeParquetScan [table=" + hiveReadEntry.getHiveTableWrapper()
+        + ", inputSplits=" + inputSplits
+        + ", columns=" + columns
+        + ", partitions= " + hiveReadEntry.getHivePartitionWrappers() +"]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
new file mode 100644
index 0000000..b37b258
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.expression.SchemaPath;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Extension of {@link HiveSubScan} which support reading Hive tables using Drill's native parquet reader.
+ */
+@JsonTypeName("hive-drill-native-parquet-sub-scan")
+public class HiveDrillNativeParquetSubScan extends HiveSubScan {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveDrillNativeParquetSubScan.class);
+
+  @JsonCreator
+  public HiveDrillNativeParquetSubScan(@JsonProperty("userName") String userName,
+                                       @JsonProperty("splits") List<String> splits,
+                                       @JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry,
+                                       @JsonProperty("splitClasses") List<String> splitClasses,
+                                       @JsonProperty("columns") List<SchemaPath> columns)
+      throws IOException, ReflectiveOperationException {
+    super(userName, splits, hiveReadEntry, splitClasses, columns);
+  }
+
+  public HiveDrillNativeParquetSubScan(final HiveSubScan subScan) throws IOException, ReflectiveOperationException {
+    this(subScan.getUserName(), subScan.getSplits(), subScan.getHiveReadEntry(), subScan.getSplitClasses(),
+        subScan.getColumns());
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
new file mode 100644
index 0000000..d03a4bf
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+
+@SuppressWarnings("unused")
+public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNativeParquetSubScan> {
+
+  @Override
+  public ScanBatch getBatch(FragmentContext context, HiveDrillNativeParquetSubScan config, List<RecordBatch> children)
+      throws ExecutionSetupException {
+    final Table table = config.getTable();
+    final List<InputSplit> splits = config.getInputSplits();
+    final List<Partition> partitions = config.getPartitions();
+    final List<SchemaPath> columns = config.getColumns();
+    final Map<String, String> hiveConfigOverride = config.getHiveReadEntry().hiveConfigOverride;
+    final String partitionDesignator = context.getOptions()
+        .getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
+
+    final boolean hasPartitions = (partitions != null && partitions.size() > 0);
+
+    final List<String[]> partitionColumns = Lists.newArrayList();
+    final List<Integer> selectedPartitionColumns = Lists.newArrayList();
+    List<SchemaPath> newColumns = columns;
+    if (AbstractRecordReader.isStarQuery(columns)) {
+      for (int i = 0; i < table.getPartitionKeys().size(); i++) {
+        selectedPartitionColumns.add(i);
+      }
+    } else {
+      // Separate out the partition and non-partition columns. Non-partition columns are passed directly to the
+      // ParquetRecordReader. Partition columns are passed to ScanBatch.
+      newColumns = Lists.newArrayList();
+      Pattern pattern = Pattern.compile(String.format("%s[0-9]+", partitionDesignator));
+      for (SchemaPath column : columns) {
+        Matcher m = pattern.matcher(column.getAsUnescapedPath());
+        if (m.matches()) {
+          selectedPartitionColumns.add(
+              Integer.parseInt(column.getAsUnescapedPath().toString().substring(partitionDesignator.length())));
+        } else {
+          newColumns.add(column);
+        }
+      }
+    }
+
+    final OperatorContext oContext = context.newOperatorContext(config,
+        false /* ScanBatch is not subject to fragment memory limit */);
+
+    int currentPartitionIndex = 0;
+    boolean success = false;
+    final List<RecordReader> readers = Lists.newArrayList();
+
+    final Configuration conf = getConf(hiveConfigOverride);
+
+    // TODO: In future we can get this cache from Metadata cached on filesystem.
+    final Map<String, ParquetMetadata> footerCache = Maps.newHashMap();
+
+    try {
+      for (InputSplit split : splits) {
+        final FileSplit fileSplit = (FileSplit) split;
+        final Path finalPath = fileSplit.getPath();
+        final JobConf cloneJob =
+            new ProjectionPusher().pushProjectionsAndFilters(new JobConf(conf), finalPath.getParent());
+        final FileSystem fs = finalPath.getFileSystem(cloneJob);
+
+        ParquetMetadata parquetMetadata = footerCache.get(finalPath.toString());
+        if (parquetMetadata == null){
+          parquetMetadata = ParquetFileReader.readFooter(cloneJob, finalPath);
+          footerCache.put(finalPath.toString(), parquetMetadata);
+        }
+        final List<Integer> rowGroupNums = getRowGroupNumbersFromFileSplit(fileSplit, parquetMetadata);
+
+        for(int rowGroupNum : rowGroupNums) {
+          readers.add(new ParquetRecordReader(
+                  context,
+                  Path.getPathWithoutSchemeAndAuthority(finalPath).toString(),
+                  rowGroupNum, fs,
+                  new DirectCodecFactory(fs.getConf(), oContext.getAllocator()),
+                  parquetMetadata,
+                  newColumns)
+          );
+
+          if (hasPartitions) {
+            Partition p = partitions.get(currentPartitionIndex);
+            partitionColumns.add(p.getValues().toArray(new String[0]));
+          }
+        }
+        currentPartitionIndex++;
+      }
+      success = true;
+    } catch (final IOException e) {
+      throw new ExecutionSetupException("Failed to create RecordReaders. " + e.getMessage(), e);
+    } finally {
+      if (!success) {
+        for(RecordReader reader : readers) {
+          AutoCloseables.close(reader, logger);
+        }
+      }
+    }
+
+    // If there are no readers created (which is possible when the table is empty or no row groups are matched),
+    // create an empty RecordReader to output the schema
+    if (readers.size() == 0) {
+      readers.add(new HiveRecordReader(table, null, null, columns, context, hiveConfigOverride));
+    }
+
+    return new ScanBatch(config, context, oContext, readers.iterator(), partitionColumns, selectedPartitionColumns);
+  }
+
+  private Configuration getConf(final Map<String, String> hiveConfigOverride) {
+    final HiveConf hiveConf = new HiveConf();
+    for(Entry<String, String> prop : hiveConfigOverride.entrySet()) {
+      hiveConf.set(prop.getKey(), prop.getValue());
+    }
+
+    return hiveConf;
+  }
+
+  /**
+   * Get the list of row group numbers for given file input split. Logic used here is same as how Hive's parquet input
+   * format finds the row group numbers for input split.
+   */
+  private List<Integer> getRowGroupNumbersFromFileSplit(final FileSplit split,
+      final ParquetMetadata footer) throws IOException {
+    final List<BlockMetaData> blocks = footer.getBlocks();
+
+    final long splitStart = split.getStart();
+    final long splitLength = split.getLength();
+
+    final List<Integer> rowGroupNums = Lists.newArrayList();
+
+    int i = 0;
+    for (final BlockMetaData block : blocks) {
+      final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
+      if (firstDataPage >= splitStart && firstDataPage < splitStart + splitLength) {
+        rowGroupNums.add(i);
+      }
+      i++;
+    }
+
+    return rowGroupNums;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index 9ada569..de800a7 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -69,10 +69,12 @@ import org.apache.hadoop.security.UserGroupInformation;
 public class HiveScan extends AbstractGroupScan {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveScan.class);
 
+  protected static int HIVE_SERDE_SCAN_OVERHEAD_FACTOR = 100;
+
   @JsonProperty("hive-table")
   public HiveReadEntry hiveReadEntry;
   @JsonIgnore
-  private List<InputSplit> inputSplits = Lists.newArrayList();
+  protected List<InputSplit> inputSplits = Lists.newArrayList();
   @JsonIgnore
   public HiveStoragePlugin storagePlugin;
   @JsonProperty("storage-plugin")
@@ -121,7 +123,7 @@ public class HiveScan extends AbstractGroupScan {
     this.storagePluginName = storagePlugin.getName();
   }
 
-  private HiveScan(final HiveScan that) {
+  public HiveScan(final HiveScan that) {
     super(that);
     this.columns = that.columns;
     this.endpoints = that.endpoints;
@@ -134,6 +136,10 @@ public class HiveScan extends AbstractGroupScan {
     this.rowCount = that.rowCount;
   }
 
+  public HiveScan clone(final HiveReadEntry hiveReadEntry) throws ExecutionSetupException {
+    return new HiveScan(getUserName(), hiveReadEntry, storagePlugin, columns);
+  }
+
   public List<SchemaPath> getColumns() {
     return columns;
   }
@@ -312,6 +318,15 @@ public class HiveScan extends AbstractGroupScan {
         // having a rowCount of 0 can mean the statistics were never computed
         estRowCount = data/1024;
       }
+
+      // Hive's native reader is neither memory efficient nor fast. If the rowcount is below
+      // HIVE_SERDE_SCAN_OVERHEAD_FACTOR, make sure it is at least HIVE_SERDE_SCAN_OVERHEAD_FACTOR to enable the planner
+      // to choose HiveDrillNativeParquetScan. Due to the project on top of HiveDrillNativeParquetScan, we end up
+      // choosing the HiveScan instead of HiveDrillNativeParquetScan if the cost is too low.
+      if (estRowCount <= HIVE_SERDE_SCAN_OVERHEAD_FACTOR) {
+        estRowCount = HIVE_SERDE_SCAN_OVERHEAD_FACTOR;
+      }
+
       logger.debug("estimated row count = {}, stats row count = {}", estRowCount, rowCount);
       return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, 1, data);
     } catch (final IOException e) {
@@ -357,4 +372,9 @@ public class HiveScan extends AbstractGroupScan {
     }
     return true;
   }
+
+  @JsonIgnore
+  public boolean isNativeReader() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index 23aa37f..191a253 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -29,7 +29,9 @@ import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.planner.sql.logical.ConvertHiveParquetScanToDrillParquetScan;
 import org.apache.drill.exec.planner.sql.logical.HivePushPartitionFilterIntoScan;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
@@ -92,9 +94,17 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
   public Set<StoragePluginOptimizerRule> getOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
     final String defaultPartitionValue = HiveUtilities.getDefaultPartitionValue(config.getHiveConfigOverride());
 
-    return ImmutableSet.of(
-        HivePushPartitionFilterIntoScan.getFilterOnProject(optimizerRulesContext, defaultPartitionValue),
-        HivePushPartitionFilterIntoScan.getFilterOnScan(optimizerRulesContext, defaultPartitionValue));
+    ImmutableSet.Builder<StoragePluginOptimizerRule> ruleBuilder = ImmutableSet.builder();
+
+    ruleBuilder.add(HivePushPartitionFilterIntoScan.getFilterOnProject(optimizerRulesContext, defaultPartitionValue));
+    ruleBuilder.add(HivePushPartitionFilterIntoScan.getFilterOnScan(optimizerRulesContext, defaultPartitionValue));
+
+    if(optimizerRulesContext.getPlannerSettings().getOptions()
+        .getOption(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS).bool_val) {
+      ruleBuilder.add(ConvertHiveParquetScanToDrillParquetScan.INSTANCE);
+    }
+
+    return ruleBuilder.build();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
index 2181c2a..907539e 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -19,17 +19,22 @@ package org.apache.drill.exec.store.hive;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.store.RecordReader;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.mapred.InputSplit;
@@ -45,21 +50,18 @@ import com.google.common.io.ByteStreams;
 
 @JsonTypeName("hive-sub-scan")
 public class HiveSubScan extends AbstractBase implements SubScan {
-
-  private List<String> splits;
-
-  private HiveReadEntry hiveReadEntry;
-
-  private List<String> splitClasses;
-
-  private List<SchemaPath> columns;
+  protected HiveReadEntry hiveReadEntry;
 
   @JsonIgnore
-  private List<InputSplit> inputSplits = Lists.newArrayList();
+  protected List<InputSplit> inputSplits = Lists.newArrayList();
   @JsonIgnore
-  private Table table;
+  protected Table table;
   @JsonIgnore
-  private List<Partition> partitions;
+  protected List<Partition> partitions;
+
+  private List<String> splits;
+  private List<String> splitClasses;
+  protected List<SchemaPath> columns;
 
   @JsonCreator
   public HiveSubScan(@JsonProperty("userName") String userName,

http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
index b459ee4..6583b9a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
@@ -104,7 +104,7 @@ public class DrillHiveTable extends DrillTable{
         return typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
 
       case BINARY:
-        return typeFactory.createSqlType(SqlTypeName.BINARY);
+        return typeFactory.createSqlType(SqlTypeName.VARBINARY);
 
       case DECIMAL: {
         DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo)pTypeInfo;

http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
index f0b4bdc..9009334 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec;
 
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import org.apache.drill.exec.hive.HiveTestBase;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -112,6 +113,25 @@ public class TestHivePartitionPruning extends HiveTestBase {
     assertFalse(plan.contains("Filter"));
   }
 
+  @Test
+  public void pruneDataTypeSupportNativeReaders() throws Exception {
+    try {
+      test(String.format("alter session set `%s` = true", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
+      final String query = "EXPLAIN PLAN FOR " +
+          "SELECT * FROM hive.readtest_parquet WHERE boolean_part = true";
+
+      final String plan = getPlanInString(query, OPTIQ_FORMAT);
+
+      // Check and make sure that Filter is not present in the plan
+      assertFalse(plan.contains("Filter"));
+
+      // Make sure the plan contains the Hive scan utilizing native parquet reader
+      assertTrue(plan.contains("groupscan=[HiveDrillNativeParquetScan"));
+    } finally {
+      test(String.format("alter session set `%s` = false", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
+    }
+  }
+
   @Test // DRILL-3579
   public void selectFromPartitionedTableWithNullPartitions() throws Exception {
     final String query = "SELECT count(*) nullCount FROM hive.partition_pruning_test " +

http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
index 6423a36..32f1682 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import org.apache.drill.exec.hive.HiveTestBase;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -38,8 +39,8 @@ public class TestHiveProjectPushDown extends HiveTestBase {
     test(String.format("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
   }
 
-  private void testHelper(String query, String expectedColNamesInPlan, int expectedRecordCount)throws Exception {
-    testPhysicalPlan(query, expectedColNamesInPlan);
+  private void testHelper(String query, int expectedRecordCount, String... expectedSubstrs)throws Exception {
+    testPhysicalPlan(query, expectedSubstrs);
 
     int actualRecordCount = testSql(query);
     assertEquals(String.format("Received unexpected number of rows in output: expected=%d, received=%s",
@@ -51,7 +52,7 @@ public class TestHiveProjectPushDown extends HiveTestBase {
     String query = "SELECT `value` as v FROM hive.`default`.kv";
     String expectedColNames = " \"columns\" : [ \"`value`\" ]";
 
-    testHelper(query, expectedColNames, 5);
+    testHelper(query, 5, expectedColNames);
   }
 
   @Test
@@ -59,7 +60,7 @@ public class TestHiveProjectPushDown extends HiveTestBase {
     String query = "SELECT boolean_field as b_f, tinyint_field as ti_f FROM hive.`default`.readtest";
     String expectedColNames = " \"columns\" : [ \"`boolean_field`\", \"`tinyint_field`\" ]";
 
-    testHelper(query, expectedColNames, 2);
+    testHelper(query, 2, expectedColNames);
   }
 
   @Test
@@ -67,7 +68,7 @@ public class TestHiveProjectPushDown extends HiveTestBase {
     String query = "SELECT double_part as dbl_p FROM hive.`default`.readtest";
     String expectedColNames = " \"columns\" : [ \"`double_part`\" ]";
 
-    testHelper(query, expectedColNames, 2);
+    testHelper(query, 2, expectedColNames);
   }
 
   @Test
@@ -75,7 +76,7 @@ public class TestHiveProjectPushDown extends HiveTestBase {
     String query = "SELECT double_part as dbl_p, decimal0_part as dec_p FROM hive.`default`.readtest";
     String expectedColNames = " \"columns\" : [ \"`double_part`\", \"`decimal0_part`\" ]";
 
-    testHelper(query, expectedColNames, 2);
+    testHelper(query, 2, expectedColNames);
   }
 
   @Test
@@ -85,7 +86,7 @@ public class TestHiveProjectPushDown extends HiveTestBase {
     String expectedColNames = " \"columns\" : [ \"`boolean_field`\", \"`tinyint_field`\", " +
         "\"`double_part`\", \"`decimal0_part`\" ]";
 
-    testHelper(query, expectedColNames, 2);
+    testHelper(query, 2, expectedColNames);
   }
 
   @Test
@@ -93,6 +94,19 @@ public class TestHiveProjectPushDown extends HiveTestBase {
     String query = "SELECT * FROM hive.`default`.kv";
     String expectedColNames = " \"columns\" : [ \"`key`\", \"`value`\" ]";
 
-    testHelper(query, expectedColNames, 5);
+    testHelper(query, 5, expectedColNames);
+  }
+
+  @Test
+  public void projectPushDownOnHiveParquetTable() throws Exception {
+    try {
+      test(String.format("alter session set `%s` = true", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
+      String query = "SELECT boolean_field, boolean_part, int_field, int_part FROM hive.readtest_parquet";
+      String expectedColNames = "\"columns\" : [ \"`boolean_field`\", \"`dir1`\", \"`int_field`\", \"`dir10`\" ]";
+
+      testHelper(query, 2, expectedColNames, "hive-drill-native-parquet-scan");
+    } finally {
+      test(String.format("alter session set `%s` = false", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
index 9211af6..1112e8c 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
@@ -18,9 +18,12 @@
 package org.apache.drill.exec.hive;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.hadoop.fs.FileSystem;
 import org.joda.time.DateTime;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.math.BigDecimal;
@@ -28,6 +31,11 @@ import java.sql.Date;
 import java.sql.Timestamp;
 
 public class TestHiveStorage extends HiveTestBase {
+  @BeforeClass
+  public static void setupOptions() throws Exception {
+    test(String.format("alter session set `%s` = true", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
+  }
+
   @Test
   public void hiveReadWithDb() throws Exception {
     test("select * from hive.kv");
@@ -48,7 +56,7 @@ public class TestHiveStorage extends HiveTestBase {
         .unOrdered()
         .baselineColumns("col1")
         .baselineValues("binaryfield")
-        .baselineValues(new Object[] { null })
+        .baselineValues(new Object[]{null})
         .go();
   }
 
@@ -59,105 +67,205 @@ public class TestHiveStorage extends HiveTestBase {
    */
   @Test
   public void readAllSupportedHiveDataTypes() throws Exception {
-      try {
-          // enable decimal type
-          test(String.format("alter session set `%s` = true", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
-
-          testBuilder().sqlQuery("SELECT * FROM hive.readtest")
-              .unOrdered()
-              .baselineColumns(
-                  "binary_field",
-                  "boolean_field",
-                  "tinyint_field",
-                  "decimal0_field",
-                  "decimal9_field",
-                  "decimal18_field",
-                  "decimal28_field",
-                  "decimal38_field",
-                  "double_field",
-                  "float_field",
-                  "int_field",
-                  "bigint_field",
-                  "smallint_field",
-                  "string_field",
-                  "varchar_field",
-                  "timestamp_field",
-                  "date_field",
-                  "binary_part",
-                  "boolean_part",
-                  "tinyint_part",
-                  "decimal0_part",
-                  "decimal9_part",
-                  "decimal18_part",
-                  "decimal28_part",
-                  "decimal38_part",
-                  "double_part",
-                  "float_part",
-                  "int_part",
-                  "bigint_part",
-                  "smallint_part",
-                  "string_part",
-                  "varchar_part",
-                  "timestamp_part",
-                  "date_part")
-              .baselineValues(
-                  "binaryfield",
-                  false,
-                  34,
-                  new BigDecimal("66"),
-                  new BigDecimal("2347.92"),
-                  new BigDecimal("2758725827.99990"),
-                  new BigDecimal("29375892739852.8"),
-                  new BigDecimal("89853749534593985.783"),
-                  8.345d,
-                  4.67f,
-                  123456,
-                  234235L,
-                  3455,
-                  "stringfield",
-                  "varcharfield",
-                  new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
-                  new DateTime(Date.valueOf("2013-07-05").getTime()),
-                  "binary",
-                  true,
-                  64,
-                  new BigDecimal("37"),
-                  new BigDecimal("36.90"),
-                  new BigDecimal("3289379872.94565"),
-                  new BigDecimal("39579334534534.4"),
-                  new BigDecimal("363945093845093890.900"),
-                  8.345d,
-                  4.67f,
-                  123456,
-                  234235L,
-                  3455,
-                  "string",
-                  "varchar",
-                  new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
-                  new DateTime(Date.valueOf("2013-07-05").getTime()))
-              .baselineValues( // All fields are null, but partition fields have non-null values
-                  null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null,
-                  "binary",
-                  true,
-                  64,
-                  new BigDecimal("37"),
-                  new BigDecimal("36.90"),
-                  new BigDecimal("3289379872.94565"),
-                  new BigDecimal("39579334534534.4"),
-                  new BigDecimal("363945093845093890.900"),
-                  8.345d,
-                  4.67f,
-                  123456,
-                  234235L,
-                  3455,
-                  "string",
-                  "varchar",
-                  new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
-                  new DateTime(Date.valueOf("2013-07-05").getTime()))
-              .go();
-      } finally {
-          test(String.format("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
-      }
+    testBuilder().sqlQuery("SELECT * FROM hive.readtest")
+        .unOrdered()
+        .baselineColumns(
+            "binary_field",
+            "boolean_field",
+            "tinyint_field",
+            "decimal0_field",
+            "decimal9_field",
+            "decimal18_field",
+            "decimal28_field",
+            "decimal38_field",
+            "double_field",
+            "float_field",
+            "int_field",
+            "bigint_field",
+            "smallint_field",
+            "string_field",
+            "varchar_field",
+            "timestamp_field",
+            "date_field",
+            "binary_part",
+            "boolean_part",
+            "tinyint_part",
+            "decimal0_part",
+            "decimal9_part",
+            "decimal18_part",
+            "decimal28_part",
+            "decimal38_part",
+            "double_part",
+            "float_part",
+            "int_part",
+            "bigint_part",
+            "smallint_part",
+            "string_part",
+            "varchar_part",
+            "timestamp_part",
+            "date_part")
+        .baselineValues(
+            "binaryfield",
+            false,
+            34,
+            new BigDecimal("66"),
+            new BigDecimal("2347.92"),
+            new BigDecimal("2758725827.99990"),
+            new BigDecimal("29375892739852.8"),
+            new BigDecimal("89853749534593985.783"),
+            8.345d,
+            4.67f,
+            123456,
+            234235L,
+            3455,
+            "stringfield",
+            "varcharfield",
+            new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
+            new DateTime(Date.valueOf("2013-07-05").getTime()),
+            "binary",
+            true,
+            64,
+            new BigDecimal("37"),
+            new BigDecimal("36.90"),
+            new BigDecimal("3289379872.94565"),
+            new BigDecimal("39579334534534.4"),
+            new BigDecimal("363945093845093890.900"),
+            8.345d,
+            4.67f,
+            123456,
+            234235L,
+            3455,
+            "string",
+            "varchar",
+            new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
+            new DateTime(Date.valueOf("2013-07-05").getTime()))
+        .baselineValues( // All fields are null, but partition fields have non-null values
+            null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null,
+            "binary",
+            true,
+            64,
+            new BigDecimal("37"),
+            new BigDecimal("36.90"),
+            new BigDecimal("3289379872.94565"),
+            new BigDecimal("39579334534534.4"),
+            new BigDecimal("363945093845093890.900"),
+            8.345d,
+            4.67f,
+            123456,
+            234235L,
+            3455,
+            "string",
+            "varchar",
+            new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
+            new DateTime(Date.valueOf("2013-07-05").getTime()))
+        .go();
+  }
+
+  /**
+   * Test to ensure Drill reads the all supported types through native Parquet readers.
+   * NOTE: As part of Hive 1.2 upgrade, make sure this test and {@link #readAllSupportedHiveDataTypes()} are merged
+   * into one test.
+   */
+  @Test
+  public void readAllSupportedHiveDataTypesNativeParquet() throws Exception {
+    try {
+      test(String.format("alter session set `%s` = true", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
+      final String query = "SELECT * FROM hive.readtest_parquet";
+
+      // Make sure the plan has Hive scan with native parquet reader
+      testPhysicalPlan(query, "hive-drill-native-parquet-scan");
+
+      testBuilder().sqlQuery(query)
+          .unOrdered()
+          .baselineColumns(
+              "boolean_field",
+              "tinyint_field",
+              "decimal0_field",
+              "decimal9_field",
+              "decimal18_field",
+              "decimal28_field",
+              "decimal38_field",
+              "double_field",
+              "float_field",
+              "int_field",
+              "bigint_field",
+              "smallint_field",
+              "string_field",
+              "varchar_field",
+              "timestamp_field",
+              "binary_part",
+              "boolean_part",
+              "tinyint_part",
+              "decimal0_part",
+              "decimal9_part",
+              "decimal18_part",
+              "decimal28_part",
+              "decimal38_part",
+              "double_part",
+              "float_part",
+              "int_part",
+              "bigint_part",
+              "smallint_part",
+              "string_part",
+              "varchar_part",
+              "timestamp_part",
+              "date_part")
+          .baselineValues(
+              false,
+              34,
+              new BigDecimal("66"),
+              new BigDecimal("2347.92"),
+              new BigDecimal("2758725827.99990"),
+              new BigDecimal("29375892739852.8"),
+              new BigDecimal("89853749534593985.783"),
+              8.345d,
+              4.67f,
+              123456,
+              234235L,
+              3455,
+              "stringfield",
+              "varcharfield",
+              new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
+              "binary",
+              true,
+              64,
+              new BigDecimal("37"),
+              new BigDecimal("36.90"),
+              new BigDecimal("3289379872.94565"),
+              new BigDecimal("39579334534534.4"),
+              new BigDecimal("363945093845093890.900"),
+              8.345d,
+              4.67f,
+              123456,
+              234235L,
+              3455,
+              "string",
+              "varchar",
+              new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
+              new DateTime(Date.valueOf("2013-07-05").getTime()))
+          .baselineValues( // All fields are null, but partition fields have non-null values
+              null, null, null, null, null, null, null, null, null, null, null, null, null, null, null,
+              "binary",
+              true,
+              64,
+              new BigDecimal("37"),
+              new BigDecimal("36.90"),
+              new BigDecimal("3289379872.94565"),
+              new BigDecimal("39579334534534.4"),
+              new BigDecimal("363945093845093890.900"),
+              8.345d,
+              4.67f,
+              123456,
+              234235L,
+              3455,
+              "string",
+              "varchar",
+              new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
+              new DateTime(Date.valueOf("2013-07-05").getTime()))
+          .go();
+    } finally {
+        test(String.format("alter session set `%s` = false", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
+    }
   }
 
   @Test
@@ -190,8 +298,8 @@ public class TestHiveStorage extends HiveTestBase {
 
   @Test // DRILL-745
   public void queryingHiveAvroTable() throws Exception {
-    testBuilder()
-        .sqlQuery("SELECT * FROM hive.db1.avro ORDER BY key DESC LIMIT 1")
+      testBuilder()
+          .sqlQuery("SELECT * FROM hive.db1.avro ORDER BY key DESC LIMIT 1")
         .unOrdered()
         .baselineColumns("key", "value")
         .baselineValues(5, " key_5")
@@ -218,4 +326,9 @@ public class TestHiveStorage extends HiveTestBase {
         .baselineValues(1L)
         .go();
   }
+
+  @AfterClass
+  public static void shutdownOptions() throws Exception {
+    test(String.format("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
index 6118be5..d203bd4 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
@@ -34,6 +34,7 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase {
         .baselineColumns("TABLE_SCHEMA", "TABLE_NAME")
         .baselineValues("hive.default", "partition_pruning_test")
         .baselineValues("hive.default", "readtest")
+        .baselineValues("hive.default", "readtest_parquet")
         .baselineValues("hive.default", "empty_table")
         .baselineValues("hive.default", "infoschematest")
         .baselineValues("hive.default", "hiveview")

http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index 34a7ed6..17a433f 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -253,7 +253,7 @@ public class HiveTestDataGenerator {
 
     // Load data into table 'readtest'
     executeQuery(hiveDriver,
-        String.format("LOAD DATA LOCAL INPATH '%s' OVERWRITE INTO TABLE default.readtest PARTITION (" +
+        String.format("LOAD DATA LOCAL INPATH '%s' INTO TABLE default.readtest PARTITION (" +
         "  binary_part='binary', " +
         "  boolean_part='true', " +
         "  tinyint_part='64', " +
@@ -295,6 +295,112 @@ public class HiveTestDataGenerator {
         "uniontypeType UNIONTYPE<int, double, array<string>>)"
     );
 
+    /**
+     * Create a PARQUET table with all supported types. In Hive 1.0.0, Hive Parquet format doesn't support BINARY and
+     * DATE types. Once the Hive storage plugin is upgraded to Hive 1.2 convert the DDL following this comment into
+     * following one line.
+     *
+     * executeQuery(hiveDriver, "CREATE TABLE readtest_parquet STORED AS parquet AS SELECT * FROM readtest");
+     */
+    executeQuery(hiveDriver,
+        "CREATE TABLE readtest_parquet (" +
+            "  boolean_field BOOLEAN, " +
+            "  tinyint_field TINYINT," +
+            "  decimal0_field DECIMAL," +
+            "  decimal9_field DECIMAL(6, 2)," +
+            "  decimal18_field DECIMAL(15, 5)," +
+            "  decimal28_field DECIMAL(23, 1)," +
+            "  decimal38_field DECIMAL(30, 3)," +
+            "  double_field DOUBLE," +
+            "  float_field FLOAT," +
+            "  int_field INT," +
+            "  bigint_field BIGINT," +
+            "  smallint_field SMALLINT," +
+            "  string_field STRING," +
+            "  varchar_field VARCHAR(50)," +
+            "  timestamp_field TIMESTAMP" +
+            ") PARTITIONED BY (" +
+            "  binary_part BINARY," +
+            "  boolean_part BOOLEAN," +
+            "  tinyint_part TINYINT," +
+            "  decimal0_part DECIMAL," +
+            "  decimal9_part DECIMAL(6, 2)," +
+            "  decimal18_part DECIMAL(15, 5)," +
+            "  decimal28_part DECIMAL(23, 1)," +
+            "  decimal38_part DECIMAL(30, 3)," +
+            "  double_part DOUBLE," +
+            "  float_part FLOAT," +
+            "  int_part INT," +
+            "  bigint_part BIGINT," +
+            "  smallint_part SMALLINT," +
+            "  string_part STRING," +
+            "  varchar_part VARCHAR(50)," +
+            "  timestamp_part TIMESTAMP," +
+            "  date_part DATE" +
+            ") STORED AS parquet "
+    );
+
+    executeQuery(hiveDriver, "INSERT OVERWRITE TABLE readtest_parquet " +
+        "PARTITION (" +
+        "  binary_part='binary', " +
+        "  boolean_part='true', " +
+        "  tinyint_part='64', " +
+        "  decimal0_part='36.9', " +
+        "  decimal9_part='36.9', " +
+        "  decimal18_part='3289379872.945645', " +
+        "  decimal28_part='39579334534534.35345', " +
+        "  decimal38_part='363945093845093890.9', " +
+        "  double_part='8.345', " +
+        "  float_part='4.67', " +
+        "  int_part='123456', " +
+        "  bigint_part='234235', " +
+        "  smallint_part='3455', " +
+        "  string_part='string', " +
+        "  varchar_part='varchar', " +
+        "  timestamp_part='2013-07-05 17:01:00', " +
+        "  date_part='2013-07-05'" +
+        ") " +
+        " SELECT " +
+        "  boolean_field," +
+        "  tinyint_field," +
+        "  decimal0_field," +
+        "  decimal9_field," +
+        "  decimal18_field," +
+        "  decimal28_field," +
+        "  decimal38_field," +
+        "  double_field," +
+        "  float_field," +
+        "  int_field," +
+        "  bigint_field," +
+        "  smallint_field," +
+        "  string_field," +
+        "  varchar_field," +
+        "  timestamp_field" +
+        " FROM readtest WHERE boolean_part = true");
+
+    // Add a second partition to table 'readtest_parquet' which contains the same values as the first partition except
+    // for boolean_part partition column
+    executeQuery(hiveDriver,
+        "ALTER TABLE readtest_parquet ADD PARTITION ( " +
+            "  binary_part='binary', " +
+            "  boolean_part='false', " +
+            "  tinyint_part='64', " +
+            "  decimal0_part='36.9', " +
+            "  decimal9_part='36.9', " +
+            "  decimal18_part='3289379872.945645', " +
+            "  decimal28_part='39579334534534.35345', " +
+            "  decimal38_part='363945093845093890.9', " +
+            "  double_part='8.345', " +
+            "  float_part='4.67', " +
+            "  int_part='123456', " +
+            "  bigint_part='234235', " +
+            "  smallint_part='3455', " +
+            "  string_part='string', " +
+            "  varchar_part='varchar', " +
+            "  timestamp_part='2013-07-05 17:01:00', " +
+            "  date_part='2013-07-05')"
+    );
+
     // create a Hive view to test how its metadata is populated in Drill's INFORMATION_SCHEMA
     executeQuery(hiveDriver, "CREATE VIEW IF NOT EXISTS hiveview AS SELECT * FROM kv");
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 66f9f03..d54a777 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -158,6 +158,12 @@ public interface ExecConstants {
   public static String MONGO_READER_READ_NUMBERS_AS_DOUBLE = "store.mongo.read_numbers_as_double";
   public static OptionValidator MONGO_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR = new BooleanValidator(MONGO_READER_READ_NUMBERS_AS_DOUBLE, false);
 
+  // TODO: We need to add a feature that enables storage plugins to add their own options. Currently we have to declare
+  // in core which is not right. Move this option and above two mongo plugin related options once we have the feature.
+  public static String HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS = "store.hive.optimize_scan_with_native_readers";
+  public static OptionValidator HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS_VALIDATOR =
+      new BooleanValidator(HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS, false);
+
   public static final String SLICE_TARGET = "planner.slice_target";
   public static final long SLICE_TARGET_DEFAULT = 100000l;
   public static final OptionValidator SLICE_TARGET_OPTION = new PositiveLongValidator(SLICE_TARGET, Long.MAX_VALUE, SLICE_TARGET_DEFAULT);

http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 5838bd1..c58bc08 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -93,6 +93,7 @@ public class SystemOptionManager extends BaseOptionManager {
       ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL_VALIDATOR,
       ExecConstants.MONGO_READER_ALL_TEXT_MODE_VALIDATOR,
       ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR,
+      ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS_VALIDATOR,
       ExecConstants.SLICE_TARGET_OPTION,
       ExecConstants.AFFINITY_FACTOR,
       ExecConstants.MAX_WIDTH_GLOBAL,