You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/09/29 23:35:49 UTC
drill git commit: DRILL-3209: Add support for reading Hive parquet
tables using Drill native parquet reader
Repository: drill
Updated Branches:
refs/heads/master 32631bb97 -> f78ab8418
DRILL-3209: Add support for reading Hive parquet tables using Drill native parquet reader
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f78ab841
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f78ab841
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f78ab841
Branch: refs/heads/master
Commit: f78ab84183e73216b76732f66f87ccf48e2340d3
Parents: 32631bb
Author: vkorukanti <ve...@gmail.com>
Authored: Fri Sep 25 10:52:08 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Tue Sep 29 11:59:46 2015 -0700
----------------------------------------------------------------------
.../planner/sql/HivePartitionDescriptor.java | 4 +-
...onvertHiveParquetScanToDrillParquetScan.java | 292 +++++++++++++++++
.../store/hive/HiveDrillNativeParquetScan.java | 109 +++++++
.../hive/HiveDrillNativeParquetSubScan.java | 49 +++
.../hive/HiveDrillNativeScanBatchCreator.java | 192 +++++++++++
.../apache/drill/exec/store/hive/HiveScan.java | 24 +-
.../exec/store/hive/HiveStoragePlugin.java | 16 +-
.../drill/exec/store/hive/HiveSubScan.java | 24 +-
.../exec/store/hive/schema/DrillHiveTable.java | 2 +-
.../drill/exec/TestHivePartitionPruning.java | 20 ++
.../drill/exec/TestHiveProjectPushDown.java | 30 +-
.../apache/drill/exec/hive/TestHiveStorage.java | 317 +++++++++++++------
.../exec/hive/TestInfoSchemaOnHiveStorage.java | 1 +
.../exec/store/hive/HiveTestDataGenerator.java | 108 ++++++-
.../org/apache/drill/exec/ExecConstants.java | 6 +
.../server/options/SystemOptionManager.java | 1 +
16 files changed, 1065 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
index 11c6455..5009bf1 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
@@ -106,8 +106,8 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
}
HiveReadEntry newReadEntry = new HiveReadEntry(origReadEntry.table, newPartitions, origReadEntry.hiveConfigOverride);
- HiveScan newScan = new HiveScan(hiveScan.getUserName(), newReadEntry, hiveScan.storagePlugin, hiveScan.columns);
- return newScan;
+
+ return hiveScan.clone(newReadEntry);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
new file mode 100644
index 0000000..47700c9
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.logical;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.planner.logical.DrillProjectRel;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.sql.DrillSqlOperator;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.hive.HiveDrillNativeParquetScan;
+import org.apache.drill.exec.store.hive.HiveScan;
+import org.apache.drill.exec.store.hive.HiveTable.HivePartition;
+import org.apache.drill.exec.store.hive.HiveUtilities;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.mapred.InputFormat;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Convert Hive scan to use Drill's native parquet reader instead of Hive's native reader. It also adds a
+ * project to convert/cast the output of Drill's native parquet reader to match the expected output of Hive's
+ * native reader.
+ */
+public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptimizerRule {
+ private static final org.slf4j.Logger logger =
+ org.slf4j.LoggerFactory.getLogger(ConvertHiveParquetScanToDrillParquetScan.class);
+
+ public static final ConvertHiveParquetScanToDrillParquetScan INSTANCE = new ConvertHiveParquetScanToDrillParquetScan();
+
+ private static final DrillSqlOperator INT96_TO_TIMESTAMP =
+ new DrillSqlOperator("convert_fromTIMESTAMP_IMPALA", 1, true);
+
+ private ConvertHiveParquetScanToDrillParquetScan() {
+ super(RelOptHelper.any(DrillScanRel.class), "ConvertHiveScanToHiveDrillNativeScan:Parquet");
+ }
+
+ /**
+ * Rule is matched when all of the following match:
+ * 1) GroupScan in given DrillScalRel is an {@link HiveScan}
+ * 2) {@link HiveScan} is not already rewritten using Drill's native readers
+ * 3) InputFormat in Hive table metadata and all partitions metadata contains the same value
+ * {@link MapredParquetInputFormat}
+ * 4) No error occurred while checking for the above conditions. An error is logged as warning.
+ *
+ * @param call
+ * @return True if the rule can be applied. False otherwise
+ */
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final DrillScanRel scanRel = (DrillScanRel) call.rel(0);
+ final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+
+ if (!(scanRel.getGroupScan() instanceof HiveScan) || ((HiveScan) scanRel.getGroupScan()).isNativeReader()) {
+ return false;
+ }
+
+ final HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
+ final Table hiveTable = hiveScan.hiveReadEntry.getTable();
+
+ final Class<? extends InputFormat> tableInputFormat = getInputFormatFromSD(hiveTable, hiveTable.getSd());
+ if (tableInputFormat == null || !tableInputFormat.equals(MapredParquetInputFormat.class)) {
+ return false;
+ }
+
+ final List<HivePartition> partitions = hiveScan.hiveReadEntry.getHivePartitionWrappers();
+ if (partitions == null) {
+ return true;
+ }
+
+ // Make sure all partitions have the same input format as the table input format
+ for (HivePartition partition : partitions) {
+ Class<? extends InputFormat> inputFormat = getInputFormatFromSD(hiveTable, partition.getPartition().getSd());
+ if (inputFormat == null || !inputFormat.equals(tableInputFormat)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Get the input format from given {@link StorageDescriptor}
+ * @param hiveTable
+ * @param sd
+ * @return {@link InputFormat} class or null if a failure has occurred. Failure is logged as warning.
+ */
+ private Class<? extends InputFormat> getInputFormatFromSD(final Table hiveTable, final StorageDescriptor sd) {
+ try {
+ return (Class<? extends InputFormat>) Class.forName(sd.getInputFormat());
+ } catch (ReflectiveOperationException e) {
+ logger.warn("Failed to get InputFormat class from Hive table '{}.{}'. StorageDescriptor [{}]",
+ hiveTable.getDbName(), hiveTable.getTableName(), sd.toString(), e);
+ return null;
+ }
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ try {
+ final DrillScanRel hiveScanRel = (DrillScanRel) call.rel(0);
+ final HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
+
+ final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+ final String partitionColumnLabel = settings.getFsPartitionColumnLabel();
+
+ final Table hiveTable = hiveScan.hiveReadEntry.getTable();
+ checkForUnsupportedDataTypes(hiveTable);
+
+ final Map<String, String> partitionColMapping =
+ getPartitionColMapping(hiveTable, partitionColumnLabel);
+
+ final DrillScanRel nativeScanRel = createNativeScanRel(partitionColMapping, hiveScanRel);
+ final DrillProjectRel projectRel = createProjectRel(hiveScanRel, partitionColMapping, nativeScanRel);
+
+ call.transformTo(projectRel);
+ } catch (final Exception e) {
+ logger.warn("Failed to convert HiveScan to HiveDrillNativeParquetScan", e);
+ }
+ }
+
+ /**
+ * Create mapping of Hive partition column to directory column mapping.
+ */
+ private Map<String, String> getPartitionColMapping(final Table hiveTable, final String partitionColumnLabel) {
+ final Map<String, String> partitionColMapping = Maps.newHashMap();
+ int i = 0;
+ for (FieldSchema col : hiveTable.getPartitionKeys()) {
+ partitionColMapping.put(col.getName(), partitionColumnLabel+i);
+ i++;
+ }
+
+ return partitionColMapping;
+ }
+
+ /**
+ * Helper method which creates a DrillScalRel with native HiveScan.
+ */
+ private DrillScanRel createNativeScanRel(final Map<String, String> partitionColMapping,
+ final DrillScanRel hiveScanRel) throws Exception{
+
+ final RelDataTypeFactory typeFactory = hiveScanRel.getCluster().getTypeFactory();
+ final RelDataType varCharType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
+
+ final List<String> nativeScanColNames = Lists.newArrayList();
+ final List<RelDataType> nativeScanColTypes = Lists.newArrayList();
+ for (RelDataTypeField field : hiveScanRel.getRowType().getFieldList()) {
+ final String dirColName = partitionColMapping.get(field.getName());
+ if (dirColName != null) { // partition column
+ nativeScanColNames.add(dirColName);
+ nativeScanColTypes.add(varCharType);
+ } else {
+ nativeScanColNames.add(field.getName());
+ nativeScanColTypes.add(field.getType());
+ }
+ }
+
+ final RelDataType nativeScanRowType = typeFactory.createStructType(nativeScanColTypes, nativeScanColNames);
+
+ // Create the list of projected columns set in HiveScan. The order of this list may not be same as the order of
+ // columns in HiveScan row type. Note: If the HiveScan.getColumn() contains a '*', we just need to add it as it is,
+ // unlike above where we expanded the '*'. HiveScan and related (subscan) can handle '*'.
+ final List<SchemaPath> nativeScanCols = Lists.newArrayList();
+ for(SchemaPath colName : hiveScanRel.getColumns()) {
+ final String partitionCol = partitionColMapping.get(colName.getAsUnescapedPath());
+ if (partitionCol != null) {
+ nativeScanCols.add(SchemaPath.getSimplePath(partitionCol));
+ } else {
+ nativeScanCols.add(colName);
+ }
+ }
+
+ final HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
+ final HiveDrillNativeParquetScan nativeHiveScan =
+ new HiveDrillNativeParquetScan(
+ hiveScan.getUserName(),
+ hiveScan.hiveReadEntry,
+ hiveScan.storagePlugin,
+ nativeScanCols);
+
+ return new DrillScanRel(
+ hiveScanRel.getCluster(),
+ hiveScanRel.getTraitSet(),
+ hiveScanRel.getTable(),
+ nativeHiveScan,
+ nativeScanRowType,
+ nativeScanCols);
+ }
+
+ /**
+ * Create a project that converts the native scan output to expected output of Hive scan.
+ */
+ private DrillProjectRel createProjectRel(final DrillScanRel hiveScanRel,
+ final Map<String, String> partitionColMapping, final DrillScanRel nativeScanRel) {
+
+ final List<RexNode> rexNodes = Lists.newArrayList();
+ final RexBuilder rb = hiveScanRel.getCluster().getRexBuilder();
+ final RelDataType hiveScanRowType = hiveScanRel.getRowType();
+
+ for (String colName : hiveScanRowType.getFieldNames()) {
+ final String dirColName = partitionColMapping.get(colName);
+ if (dirColName != null) {
+ rexNodes.add(createPartitionColumnCast(hiveScanRel, nativeScanRel, colName, dirColName, rb));
+ } else {
+ rexNodes.add(createColumnFormatConversion(hiveScanRel, nativeScanRel, colName, rb));
+ }
+ }
+
+ return DrillProjectRel.create(
+ hiveScanRel.getCluster(), hiveScanRel.getTraitSet(), nativeScanRel, rexNodes,
+ hiveScanRowType /* project rowtype and HiveScanRel rowtype should be the same */);
+ }
+
+ /**
+ * Apply any data format conversion expressions.
+ */
+ private RexNode createColumnFormatConversion(final DrillScanRel hiveScanRel, final DrillScanRel nativeScanRel,
+ final String colName, final RexBuilder rb) {
+
+ final RelDataType outputType = hiveScanRel.getRowType().getField(colName, false, false).getType();
+ final RelDataTypeField inputField = nativeScanRel.getRowType().getField(colName, false, false);
+ final RexInputRef inputRef = rb.makeInputRef(inputField.getType(), inputField.getIndex());
+
+ if (outputType.getSqlTypeName() == SqlTypeName.TIMESTAMP) {
+ // TIMESTAMP is stored as INT96 by Hive in ParquetFormat. Use convert_fromTIMESTAMP_IMPALA UDF to convert
+ // INT96 format data to TIMESTAMP
+ return rb.makeCall(INT96_TO_TIMESTAMP, inputRef);
+ }
+
+ return inputRef;
+ }
+
+ /**
+ * Create a cast for partition column. Partition column is output as "VARCHAR" in native parquet reader. Cast it
+ * appropriate type according the partition type in HiveScan.
+ */
+ private RexNode createPartitionColumnCast(final DrillScanRel hiveScanRel, final DrillScanRel nativeScanRel,
+ final String outputColName, final String dirColName, final RexBuilder rb) {
+
+ final RelDataType outputType = hiveScanRel.getRowType().getField(outputColName, false, false).getType();
+ final RelDataTypeField inputField = nativeScanRel.getRowType().getField(dirColName, false, false);
+ final RexInputRef inputRef =
+ rb.makeInputRef(rb.getTypeFactory().createSqlType(SqlTypeName.VARCHAR), inputField.getIndex());
+
+ return rb.makeCast(outputType, inputRef);
+ }
+
+ private void checkForUnsupportedDataTypes(final Table hiveTable) {
+ for(FieldSchema hiveField : hiveTable.getSd().getCols()) {
+ final Category category = TypeInfoUtils.getTypeInfoFromTypeString(hiveField.getType()).getCategory();
+ if (category == Category.MAP ||
+ category == Category.STRUCT ||
+ category == Category.UNION ||
+ category == Category.LIST) {
+ HiveUtilities.throwUnsupportedHiveDataTypeError(category.toString());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
new file mode 100644
index 0000000..4d495da
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Extension of {@link HiveScan} which support reading Hive tables using Drill's native parquet reader.
+ */
+@JsonTypeName("hive-drill-native-parquet-scan")
+public class HiveDrillNativeParquetScan extends HiveScan {
+
+ @JsonCreator
+ public HiveDrillNativeParquetScan(@JsonProperty("userName") String userName,
+ @JsonProperty("hive-table") HiveReadEntry hiveReadEntry,
+ @JsonProperty("storage-plugin") String storagePluginName,
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
+ super(userName, hiveReadEntry, storagePluginName, columns, pluginRegistry);
+ }
+
+ public HiveDrillNativeParquetScan(String userName, HiveReadEntry hiveReadEntry, HiveStoragePlugin storagePlugin,
+ List<SchemaPath> columns) throws ExecutionSetupException {
+ super(userName, hiveReadEntry, storagePlugin, columns);
+ }
+
+ public HiveDrillNativeParquetScan(final HiveScan hiveScan) {
+ super(hiveScan);
+ }
+
+ @Override
+ public ScanStats getScanStats() {
+ final ScanStats nativeHiveScanStats = super.getScanStats();
+
+ // As Drill's native parquet record reader is faster and memory efficient divide the costs by a factor.
+ return new ScanStats(
+ nativeHiveScanStats.getGroupScanProperty(),
+ nativeHiveScanStats.getRecordCount()/HIVE_SERDE_SCAN_OVERHEAD_FACTOR,
+ nativeHiveScanStats.getCpuCost()/HIVE_SERDE_SCAN_OVERHEAD_FACTOR,
+ nativeHiveScanStats.getDiskCost()/HIVE_SERDE_SCAN_OVERHEAD_FACTOR);
+ }
+
+ @Override
+ public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
+ try {
+ return new HiveDrillNativeParquetSubScan((HiveSubScan)super.getSpecificScan(minorFragmentId));
+ } catch (IOException | ReflectiveOperationException e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ @Override
+ public boolean isNativeReader() {
+ return true;
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+ return new HiveDrillNativeParquetScan(this);
+ }
+
+ @Override
+ public HiveScan clone(HiveReadEntry hiveReadEntry) throws ExecutionSetupException {
+ return new HiveDrillNativeParquetScan(getUserName(), hiveReadEntry, storagePlugin, columns);
+ }
+
+ @Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ final HiveDrillNativeParquetScan scan = new HiveDrillNativeParquetScan(this);
+ scan.columns = columns;
+ return scan;
+ }
+
+ @Override
+ public String toString() {
+ return "HiveDrillNativeParquetScan [table=" + hiveReadEntry.getHiveTableWrapper()
+ + ", inputSplits=" + inputSplits
+ + ", columns=" + columns
+ + ", partitions= " + hiveReadEntry.getHivePartitionWrappers() +"]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
new file mode 100644
index 0000000..b37b258
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.expression.SchemaPath;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Extension of {@link HiveSubScan} which support reading Hive tables using Drill's native parquet reader.
+ */
+@JsonTypeName("hive-drill-native-parquet-sub-scan")
+public class HiveDrillNativeParquetSubScan extends HiveSubScan {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveDrillNativeParquetSubScan.class);
+
+ @JsonCreator
+ public HiveDrillNativeParquetSubScan(@JsonProperty("userName") String userName,
+ @JsonProperty("splits") List<String> splits,
+ @JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry,
+ @JsonProperty("splitClasses") List<String> splitClasses,
+ @JsonProperty("columns") List<SchemaPath> columns)
+ throws IOException, ReflectiveOperationException {
+ super(userName, splits, hiveReadEntry, splitClasses, columns);
+ }
+
+ public HiveDrillNativeParquetSubScan(final HiveSubScan subScan) throws IOException, ReflectiveOperationException {
+ this(subScan.getUserName(), subScan.getSplits(), subScan.getHiveReadEntry(), subScan.getSplitClasses(),
+ subScan.getColumns());
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
new file mode 100644
index 0000000..d03a4bf
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+
+@SuppressWarnings("unused")
+public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNativeParquetSubScan> {
+
+ @Override
+ public ScanBatch getBatch(FragmentContext context, HiveDrillNativeParquetSubScan config, List<RecordBatch> children)
+ throws ExecutionSetupException {
+ final Table table = config.getTable();
+ final List<InputSplit> splits = config.getInputSplits();
+ final List<Partition> partitions = config.getPartitions();
+ final List<SchemaPath> columns = config.getColumns();
+ final Map<String, String> hiveConfigOverride = config.getHiveReadEntry().hiveConfigOverride;
+ final String partitionDesignator = context.getOptions()
+ .getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
+
+ final boolean hasPartitions = (partitions != null && partitions.size() > 0);
+
+ final List<String[]> partitionColumns = Lists.newArrayList();
+ final List<Integer> selectedPartitionColumns = Lists.newArrayList();
+ List<SchemaPath> newColumns = columns;
+ if (AbstractRecordReader.isStarQuery(columns)) {
+ for (int i = 0; i < table.getPartitionKeys().size(); i++) {
+ selectedPartitionColumns.add(i);
+ }
+ } else {
+ // Separate out the partition and non-partition columns. Non-partition columns are passed directly to the
+ // ParquetRecordReader. Partition columns are passed to ScanBatch.
+ newColumns = Lists.newArrayList();
+ Pattern pattern = Pattern.compile(String.format("%s[0-9]+", partitionDesignator));
+ for (SchemaPath column : columns) {
+ Matcher m = pattern.matcher(column.getAsUnescapedPath());
+ if (m.matches()) {
+ selectedPartitionColumns.add(
+ Integer.parseInt(column.getAsUnescapedPath().toString().substring(partitionDesignator.length())));
+ } else {
+ newColumns.add(column);
+ }
+ }
+ }
+
+ final OperatorContext oContext = context.newOperatorContext(config,
+ false /* ScanBatch is not subject to fragment memory limit */);
+
+ int currentPartitionIndex = 0;
+ boolean success = false;
+ final List<RecordReader> readers = Lists.newArrayList();
+
+ final Configuration conf = getConf(hiveConfigOverride);
+
+ // TODO: In future we can get this cache from Metadata cached on filesystem.
+ final Map<String, ParquetMetadata> footerCache = Maps.newHashMap();
+
+ try {
+ for (InputSplit split : splits) {
+ final FileSplit fileSplit = (FileSplit) split;
+ final Path finalPath = fileSplit.getPath();
+ final JobConf cloneJob =
+ new ProjectionPusher().pushProjectionsAndFilters(new JobConf(conf), finalPath.getParent());
+ final FileSystem fs = finalPath.getFileSystem(cloneJob);
+
+ ParquetMetadata parquetMetadata = footerCache.get(finalPath.toString());
+ if (parquetMetadata == null){
+ parquetMetadata = ParquetFileReader.readFooter(cloneJob, finalPath);
+ footerCache.put(finalPath.toString(), parquetMetadata);
+ }
+ final List<Integer> rowGroupNums = getRowGroupNumbersFromFileSplit(fileSplit, parquetMetadata);
+
+ for(int rowGroupNum : rowGroupNums) {
+ readers.add(new ParquetRecordReader(
+ context,
+ Path.getPathWithoutSchemeAndAuthority(finalPath).toString(),
+ rowGroupNum, fs,
+ new DirectCodecFactory(fs.getConf(), oContext.getAllocator()),
+ parquetMetadata,
+ newColumns)
+ );
+
+ if (hasPartitions) {
+ Partition p = partitions.get(currentPartitionIndex);
+ partitionColumns.add(p.getValues().toArray(new String[0]));
+ }
+ }
+ currentPartitionIndex++;
+ }
+ success = true;
+ } catch (final IOException e) {
+ throw new ExecutionSetupException("Failed to create RecordReaders. " + e.getMessage(), e);
+ } finally {
+ if (!success) {
+ for(RecordReader reader : readers) {
+ AutoCloseables.close(reader, logger);
+ }
+ }
+ }
+
+ // If there are no readers created (which is possible when the table is empty or no row groups are matched),
+ // create an empty RecordReader to output the schema
+ if (readers.size() == 0) {
+ readers.add(new HiveRecordReader(table, null, null, columns, context, hiveConfigOverride));
+ }
+
+ return new ScanBatch(config, context, oContext, readers.iterator(), partitionColumns, selectedPartitionColumns);
+ }
+
+ private Configuration getConf(final Map<String, String> hiveConfigOverride) {
+ final HiveConf hiveConf = new HiveConf();
+ for(Entry<String, String> prop : hiveConfigOverride.entrySet()) {
+ hiveConf.set(prop.getKey(), prop.getValue());
+ }
+
+ return hiveConf;
+ }
+
+ /**
+ * Get the list of row group numbers for given file input split. Logic used here is same as how Hive's parquet input
+ * format finds the row group numbers for input split.
+ */
+ private List<Integer> getRowGroupNumbersFromFileSplit(final FileSplit split,
+ final ParquetMetadata footer) throws IOException {
+ final List<BlockMetaData> blocks = footer.getBlocks();
+
+ final long splitStart = split.getStart();
+ final long splitLength = split.getLength();
+
+ final List<Integer> rowGroupNums = Lists.newArrayList();
+
+ int i = 0;
+ for (final BlockMetaData block : blocks) {
+ final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
+ if (firstDataPage >= splitStart && firstDataPage < splitStart + splitLength) {
+ rowGroupNums.add(i);
+ }
+ i++;
+ }
+
+ return rowGroupNums;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index 9ada569..de800a7 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -69,10 +69,12 @@ import org.apache.hadoop.security.UserGroupInformation;
public class HiveScan extends AbstractGroupScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveScan.class);
+ protected static int HIVE_SERDE_SCAN_OVERHEAD_FACTOR = 100;
+
@JsonProperty("hive-table")
public HiveReadEntry hiveReadEntry;
@JsonIgnore
- private List<InputSplit> inputSplits = Lists.newArrayList();
+ protected List<InputSplit> inputSplits = Lists.newArrayList();
@JsonIgnore
public HiveStoragePlugin storagePlugin;
@JsonProperty("storage-plugin")
@@ -121,7 +123,7 @@ public class HiveScan extends AbstractGroupScan {
this.storagePluginName = storagePlugin.getName();
}
- private HiveScan(final HiveScan that) {
+ public HiveScan(final HiveScan that) {
super(that);
this.columns = that.columns;
this.endpoints = that.endpoints;
@@ -134,6 +136,10 @@ public class HiveScan extends AbstractGroupScan {
this.rowCount = that.rowCount;
}
+ public HiveScan clone(final HiveReadEntry hiveReadEntry) throws ExecutionSetupException {
+ return new HiveScan(getUserName(), hiveReadEntry, storagePlugin, columns);
+ }
+
public List<SchemaPath> getColumns() {
return columns;
}
@@ -312,6 +318,15 @@ public class HiveScan extends AbstractGroupScan {
// having a rowCount of 0 can mean the statistics were never computed
estRowCount = data/1024;
}
+
+ // Hive's native reader is neither memory efficient nor fast. If the rowcount is below
+ // HIVE_SERDE_SCAN_OVERHEAD_FACTOR, make sure it is at least HIVE_SERDE_SCAN_OVERHEAD_FACTOR to enable the planner
+ // to choose HiveDrillNativeParquetScan. Due to the project on top of HiveDrillNativeParquetScan, we end up
+ // choosing the HiveScan instead of HiveDrillNativeParquetScan if the cost is too low.
+ if (estRowCount <= HIVE_SERDE_SCAN_OVERHEAD_FACTOR) {
+ estRowCount = HIVE_SERDE_SCAN_OVERHEAD_FACTOR;
+ }
+
logger.debug("estimated row count = {}, stats row count = {}", estRowCount, rowCount);
return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, 1, data);
} catch (final IOException e) {
@@ -357,4 +372,9 @@ public class HiveScan extends AbstractGroupScan {
}
return true;
}
+
+ @JsonIgnore
+ public boolean isNativeReader() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index 23aa37f..191a253 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -29,7 +29,9 @@ import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.planner.sql.logical.ConvertHiveParquetScanToDrillParquetScan;
import org.apache.drill.exec.planner.sql.logical.HivePushPartitionFilterIntoScan;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
@@ -92,9 +94,17 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
public Set<StoragePluginOptimizerRule> getOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
final String defaultPartitionValue = HiveUtilities.getDefaultPartitionValue(config.getHiveConfigOverride());
- return ImmutableSet.of(
- HivePushPartitionFilterIntoScan.getFilterOnProject(optimizerRulesContext, defaultPartitionValue),
- HivePushPartitionFilterIntoScan.getFilterOnScan(optimizerRulesContext, defaultPartitionValue));
+ ImmutableSet.Builder<StoragePluginOptimizerRule> ruleBuilder = ImmutableSet.builder();
+
+ ruleBuilder.add(HivePushPartitionFilterIntoScan.getFilterOnProject(optimizerRulesContext, defaultPartitionValue));
+ ruleBuilder.add(HivePushPartitionFilterIntoScan.getFilterOnScan(optimizerRulesContext, defaultPartitionValue));
+
+ if(optimizerRulesContext.getPlannerSettings().getOptions()
+ .getOption(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS).bool_val) {
+ ruleBuilder.add(ConvertHiveParquetScanToDrillParquetScan.INSTANCE);
+ }
+
+ return ruleBuilder.build();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
index 2181c2a..907539e 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -19,17 +19,22 @@ package org.apache.drill.exec.store.hive;
import java.io.IOException;
import java.lang.reflect.Constructor;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.codec.binary.Base64;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.store.RecordReader;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapred.InputSplit;
@@ -45,21 +50,18 @@ import com.google.common.io.ByteStreams;
@JsonTypeName("hive-sub-scan")
public class HiveSubScan extends AbstractBase implements SubScan {
-
- private List<String> splits;
-
- private HiveReadEntry hiveReadEntry;
-
- private List<String> splitClasses;
-
- private List<SchemaPath> columns;
+ protected HiveReadEntry hiveReadEntry;
@JsonIgnore
- private List<InputSplit> inputSplits = Lists.newArrayList();
+ protected List<InputSplit> inputSplits = Lists.newArrayList();
@JsonIgnore
- private Table table;
+ protected Table table;
@JsonIgnore
- private List<Partition> partitions;
+ protected List<Partition> partitions;
+
+ private List<String> splits;
+ private List<String> splitClasses;
+ protected List<SchemaPath> columns;
@JsonCreator
public HiveSubScan(@JsonProperty("userName") String userName,
http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
index b459ee4..6583b9a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
@@ -104,7 +104,7 @@ public class DrillHiveTable extends DrillTable{
return typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
case BINARY:
- return typeFactory.createSqlType(SqlTypeName.BINARY);
+ return typeFactory.createSqlType(SqlTypeName.VARBINARY);
case DECIMAL: {
DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo)pTypeInfo;
http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
index f0b4bdc..9009334 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import org.apache.drill.exec.hive.HiveTestBase;
import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -112,6 +113,25 @@ public class TestHivePartitionPruning extends HiveTestBase {
assertFalse(plan.contains("Filter"));
}
+ @Test
+ public void pruneDataTypeSupportNativeReaders() throws Exception {
+ try {
+ test(String.format("alter session set `%s` = true", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
+ final String query = "EXPLAIN PLAN FOR " +
+ "SELECT * FROM hive.readtest_parquet WHERE boolean_part = true";
+
+ final String plan = getPlanInString(query, OPTIQ_FORMAT);
+
+ // Check and make sure that Filter is not present in the plan
+ assertFalse(plan.contains("Filter"));
+
+ // Make sure the plan contains the Hive scan utilizing native parquet reader
+ assertTrue(plan.contains("groupscan=[HiveDrillNativeParquetScan"));
+ } finally {
+ test(String.format("alter session set `%s` = false", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
+ }
+ }
+
@Test // DRILL-3579
public void selectFromPartitionedTableWithNullPartitions() throws Exception {
final String query = "SELECT count(*) nullCount FROM hive.partition_pruning_test " +
http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
index 6423a36..32f1682 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import org.apache.drill.exec.hive.HiveTestBase;
import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -38,8 +39,8 @@ public class TestHiveProjectPushDown extends HiveTestBase {
test(String.format("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
}
- private void testHelper(String query, String expectedColNamesInPlan, int expectedRecordCount)throws Exception {
- testPhysicalPlan(query, expectedColNamesInPlan);
+ private void testHelper(String query, int expectedRecordCount, String... expectedSubstrs)throws Exception {
+ testPhysicalPlan(query, expectedSubstrs);
int actualRecordCount = testSql(query);
assertEquals(String.format("Received unexpected number of rows in output: expected=%d, received=%s",
@@ -51,7 +52,7 @@ public class TestHiveProjectPushDown extends HiveTestBase {
String query = "SELECT `value` as v FROM hive.`default`.kv";
String expectedColNames = " \"columns\" : [ \"`value`\" ]";
- testHelper(query, expectedColNames, 5);
+ testHelper(query, 5, expectedColNames);
}
@Test
@@ -59,7 +60,7 @@ public class TestHiveProjectPushDown extends HiveTestBase {
String query = "SELECT boolean_field as b_f, tinyint_field as ti_f FROM hive.`default`.readtest";
String expectedColNames = " \"columns\" : [ \"`boolean_field`\", \"`tinyint_field`\" ]";
- testHelper(query, expectedColNames, 2);
+ testHelper(query, 2, expectedColNames);
}
@Test
@@ -67,7 +68,7 @@ public class TestHiveProjectPushDown extends HiveTestBase {
String query = "SELECT double_part as dbl_p FROM hive.`default`.readtest";
String expectedColNames = " \"columns\" : [ \"`double_part`\" ]";
- testHelper(query, expectedColNames, 2);
+ testHelper(query, 2, expectedColNames);
}
@Test
@@ -75,7 +76,7 @@ public class TestHiveProjectPushDown extends HiveTestBase {
String query = "SELECT double_part as dbl_p, decimal0_part as dec_p FROM hive.`default`.readtest";
String expectedColNames = " \"columns\" : [ \"`double_part`\", \"`decimal0_part`\" ]";
- testHelper(query, expectedColNames, 2);
+ testHelper(query, 2, expectedColNames);
}
@Test
@@ -85,7 +86,7 @@ public class TestHiveProjectPushDown extends HiveTestBase {
String expectedColNames = " \"columns\" : [ \"`boolean_field`\", \"`tinyint_field`\", " +
"\"`double_part`\", \"`decimal0_part`\" ]";
- testHelper(query, expectedColNames, 2);
+ testHelper(query, 2, expectedColNames);
}
@Test
@@ -93,6 +94,19 @@ public class TestHiveProjectPushDown extends HiveTestBase {
String query = "SELECT * FROM hive.`default`.kv";
String expectedColNames = " \"columns\" : [ \"`key`\", \"`value`\" ]";
- testHelper(query, expectedColNames, 5);
+ testHelper(query, 5, expectedColNames);
+ }
+
+ @Test
+ public void projectPushDownOnHiveParquetTable() throws Exception {
+ try {
+ test(String.format("alter session set `%s` = true", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
+ String query = "SELECT boolean_field, boolean_part, int_field, int_part FROM hive.readtest_parquet";
+ String expectedColNames = "\"columns\" : [ \"`boolean_field`\", \"`dir1`\", \"`int_field`\", \"`dir10`\" ]";
+
+ testHelper(query, 2, expectedColNames, "hive-drill-native-parquet-scan");
+ } finally {
+ test(String.format("alter session set `%s` = false", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
index 9211af6..1112e8c 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
@@ -18,9 +18,12 @@
package org.apache.drill.exec.hive;
import com.google.common.collect.ImmutableMap;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.hadoop.fs.FileSystem;
import org.joda.time.DateTime;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.math.BigDecimal;
@@ -28,6 +31,11 @@ import java.sql.Date;
import java.sql.Timestamp;
public class TestHiveStorage extends HiveTestBase {
+ @BeforeClass
+ public static void setupOptions() throws Exception {
+ test(String.format("alter session set `%s` = true", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
+ }
+
@Test
public void hiveReadWithDb() throws Exception {
test("select * from hive.kv");
@@ -48,7 +56,7 @@ public class TestHiveStorage extends HiveTestBase {
.unOrdered()
.baselineColumns("col1")
.baselineValues("binaryfield")
- .baselineValues(new Object[] { null })
+ .baselineValues(new Object[]{null})
.go();
}
@@ -59,105 +67,205 @@ public class TestHiveStorage extends HiveTestBase {
*/
@Test
public void readAllSupportedHiveDataTypes() throws Exception {
- try {
- // enable decimal type
- test(String.format("alter session set `%s` = true", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
-
- testBuilder().sqlQuery("SELECT * FROM hive.readtest")
- .unOrdered()
- .baselineColumns(
- "binary_field",
- "boolean_field",
- "tinyint_field",
- "decimal0_field",
- "decimal9_field",
- "decimal18_field",
- "decimal28_field",
- "decimal38_field",
- "double_field",
- "float_field",
- "int_field",
- "bigint_field",
- "smallint_field",
- "string_field",
- "varchar_field",
- "timestamp_field",
- "date_field",
- "binary_part",
- "boolean_part",
- "tinyint_part",
- "decimal0_part",
- "decimal9_part",
- "decimal18_part",
- "decimal28_part",
- "decimal38_part",
- "double_part",
- "float_part",
- "int_part",
- "bigint_part",
- "smallint_part",
- "string_part",
- "varchar_part",
- "timestamp_part",
- "date_part")
- .baselineValues(
- "binaryfield",
- false,
- 34,
- new BigDecimal("66"),
- new BigDecimal("2347.92"),
- new BigDecimal("2758725827.99990"),
- new BigDecimal("29375892739852.8"),
- new BigDecimal("89853749534593985.783"),
- 8.345d,
- 4.67f,
- 123456,
- 234235L,
- 3455,
- "stringfield",
- "varcharfield",
- new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
- new DateTime(Date.valueOf("2013-07-05").getTime()),
- "binary",
- true,
- 64,
- new BigDecimal("37"),
- new BigDecimal("36.90"),
- new BigDecimal("3289379872.94565"),
- new BigDecimal("39579334534534.4"),
- new BigDecimal("363945093845093890.900"),
- 8.345d,
- 4.67f,
- 123456,
- 234235L,
- 3455,
- "string",
- "varchar",
- new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
- new DateTime(Date.valueOf("2013-07-05").getTime()))
- .baselineValues( // All fields are null, but partition fields have non-null values
- null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null,
- "binary",
- true,
- 64,
- new BigDecimal("37"),
- new BigDecimal("36.90"),
- new BigDecimal("3289379872.94565"),
- new BigDecimal("39579334534534.4"),
- new BigDecimal("363945093845093890.900"),
- 8.345d,
- 4.67f,
- 123456,
- 234235L,
- 3455,
- "string",
- "varchar",
- new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
- new DateTime(Date.valueOf("2013-07-05").getTime()))
- .go();
- } finally {
- test(String.format("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
- }
+ testBuilder().sqlQuery("SELECT * FROM hive.readtest")
+ .unOrdered()
+ .baselineColumns(
+ "binary_field",
+ "boolean_field",
+ "tinyint_field",
+ "decimal0_field",
+ "decimal9_field",
+ "decimal18_field",
+ "decimal28_field",
+ "decimal38_field",
+ "double_field",
+ "float_field",
+ "int_field",
+ "bigint_field",
+ "smallint_field",
+ "string_field",
+ "varchar_field",
+ "timestamp_field",
+ "date_field",
+ "binary_part",
+ "boolean_part",
+ "tinyint_part",
+ "decimal0_part",
+ "decimal9_part",
+ "decimal18_part",
+ "decimal28_part",
+ "decimal38_part",
+ "double_part",
+ "float_part",
+ "int_part",
+ "bigint_part",
+ "smallint_part",
+ "string_part",
+ "varchar_part",
+ "timestamp_part",
+ "date_part")
+ .baselineValues(
+ "binaryfield",
+ false,
+ 34,
+ new BigDecimal("66"),
+ new BigDecimal("2347.92"),
+ new BigDecimal("2758725827.99990"),
+ new BigDecimal("29375892739852.8"),
+ new BigDecimal("89853749534593985.783"),
+ 8.345d,
+ 4.67f,
+ 123456,
+ 234235L,
+ 3455,
+ "stringfield",
+ "varcharfield",
+ new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
+ new DateTime(Date.valueOf("2013-07-05").getTime()),
+ "binary",
+ true,
+ 64,
+ new BigDecimal("37"),
+ new BigDecimal("36.90"),
+ new BigDecimal("3289379872.94565"),
+ new BigDecimal("39579334534534.4"),
+ new BigDecimal("363945093845093890.900"),
+ 8.345d,
+ 4.67f,
+ 123456,
+ 234235L,
+ 3455,
+ "string",
+ "varchar",
+ new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
+ new DateTime(Date.valueOf("2013-07-05").getTime()))
+ .baselineValues( // All fields are null, but partition fields have non-null values
+ null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null,
+ "binary",
+ true,
+ 64,
+ new BigDecimal("37"),
+ new BigDecimal("36.90"),
+ new BigDecimal("3289379872.94565"),
+ new BigDecimal("39579334534534.4"),
+ new BigDecimal("363945093845093890.900"),
+ 8.345d,
+ 4.67f,
+ 123456,
+ 234235L,
+ 3455,
+ "string",
+ "varchar",
+ new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
+ new DateTime(Date.valueOf("2013-07-05").getTime()))
+ .go();
+ }
+
+ /**
+ * Test to ensure Drill reads the all supported types through native Parquet readers.
+ * NOTE: As part of Hive 1.2 upgrade, make sure this test and {@link #readAllSupportedHiveDataTypes()} are merged
+ * into one test.
+ */
+ @Test
+ public void readAllSupportedHiveDataTypesNativeParquet() throws Exception {
+ try {
+ test(String.format("alter session set `%s` = true", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
+ final String query = "SELECT * FROM hive.readtest_parquet";
+
+ // Make sure the plan has Hive scan with native parquet reader
+ testPhysicalPlan(query, "hive-drill-native-parquet-scan");
+
+ testBuilder().sqlQuery(query)
+ .unOrdered()
+ .baselineColumns(
+ "boolean_field",
+ "tinyint_field",
+ "decimal0_field",
+ "decimal9_field",
+ "decimal18_field",
+ "decimal28_field",
+ "decimal38_field",
+ "double_field",
+ "float_field",
+ "int_field",
+ "bigint_field",
+ "smallint_field",
+ "string_field",
+ "varchar_field",
+ "timestamp_field",
+ "binary_part",
+ "boolean_part",
+ "tinyint_part",
+ "decimal0_part",
+ "decimal9_part",
+ "decimal18_part",
+ "decimal28_part",
+ "decimal38_part",
+ "double_part",
+ "float_part",
+ "int_part",
+ "bigint_part",
+ "smallint_part",
+ "string_part",
+ "varchar_part",
+ "timestamp_part",
+ "date_part")
+ .baselineValues(
+ false,
+ 34,
+ new BigDecimal("66"),
+ new BigDecimal("2347.92"),
+ new BigDecimal("2758725827.99990"),
+ new BigDecimal("29375892739852.8"),
+ new BigDecimal("89853749534593985.783"),
+ 8.345d,
+ 4.67f,
+ 123456,
+ 234235L,
+ 3455,
+ "stringfield",
+ "varcharfield",
+ new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
+ "binary",
+ true,
+ 64,
+ new BigDecimal("37"),
+ new BigDecimal("36.90"),
+ new BigDecimal("3289379872.94565"),
+ new BigDecimal("39579334534534.4"),
+ new BigDecimal("363945093845093890.900"),
+ 8.345d,
+ 4.67f,
+ 123456,
+ 234235L,
+ 3455,
+ "string",
+ "varchar",
+ new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
+ new DateTime(Date.valueOf("2013-07-05").getTime()))
+ .baselineValues( // All fields are null, but partition fields have non-null values
+ null, null, null, null, null, null, null, null, null, null, null, null, null, null, null,
+ "binary",
+ true,
+ 64,
+ new BigDecimal("37"),
+ new BigDecimal("36.90"),
+ new BigDecimal("3289379872.94565"),
+ new BigDecimal("39579334534534.4"),
+ new BigDecimal("363945093845093890.900"),
+ 8.345d,
+ 4.67f,
+ 123456,
+ 234235L,
+ 3455,
+ "string",
+ "varchar",
+ new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
+ new DateTime(Date.valueOf("2013-07-05").getTime()))
+ .go();
+ } finally {
+ test(String.format("alter session set `%s` = false", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
+ }
}
@Test
@@ -190,8 +298,8 @@ public class TestHiveStorage extends HiveTestBase {
@Test // DRILL-745
public void queryingHiveAvroTable() throws Exception {
- testBuilder()
- .sqlQuery("SELECT * FROM hive.db1.avro ORDER BY key DESC LIMIT 1")
+ testBuilder()
+ .sqlQuery("SELECT * FROM hive.db1.avro ORDER BY key DESC LIMIT 1")
.unOrdered()
.baselineColumns("key", "value")
.baselineValues(5, " key_5")
@@ -218,4 +326,9 @@ public class TestHiveStorage extends HiveTestBase {
.baselineValues(1L)
.go();
}
+
+ @AfterClass
+ public static void shutdownOptions() throws Exception {
+ test(String.format("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
index 6118be5..d203bd4 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
@@ -34,6 +34,7 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase {
.baselineColumns("TABLE_SCHEMA", "TABLE_NAME")
.baselineValues("hive.default", "partition_pruning_test")
.baselineValues("hive.default", "readtest")
+ .baselineValues("hive.default", "readtest_parquet")
.baselineValues("hive.default", "empty_table")
.baselineValues("hive.default", "infoschematest")
.baselineValues("hive.default", "hiveview")
http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index 34a7ed6..17a433f 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -253,7 +253,7 @@ public class HiveTestDataGenerator {
// Load data into table 'readtest'
executeQuery(hiveDriver,
- String.format("LOAD DATA LOCAL INPATH '%s' OVERWRITE INTO TABLE default.readtest PARTITION (" +
+ String.format("LOAD DATA LOCAL INPATH '%s' INTO TABLE default.readtest PARTITION (" +
" binary_part='binary', " +
" boolean_part='true', " +
" tinyint_part='64', " +
@@ -295,6 +295,112 @@ public class HiveTestDataGenerator {
"uniontypeType UNIONTYPE<int, double, array<string>>)"
);
+ /**
+ * Create a PARQUET table with all supported types. In Hive 1.0.0, Hive Parquet format doesn't support BINARY and
+ * DATE types. Once the Hive storage plugin is upgraded to Hive 1.2 convert the DDL following this comment into
+ * following one line.
+ *
+ * executeQuery(hiveDriver, "CREATE TABLE readtest_parquet STORED AS parquet AS SELECT * FROM readtest");
+ */
+ executeQuery(hiveDriver,
+ "CREATE TABLE readtest_parquet (" +
+ " boolean_field BOOLEAN, " +
+ " tinyint_field TINYINT," +
+ " decimal0_field DECIMAL," +
+ " decimal9_field DECIMAL(6, 2)," +
+ " decimal18_field DECIMAL(15, 5)," +
+ " decimal28_field DECIMAL(23, 1)," +
+ " decimal38_field DECIMAL(30, 3)," +
+ " double_field DOUBLE," +
+ " float_field FLOAT," +
+ " int_field INT," +
+ " bigint_field BIGINT," +
+ " smallint_field SMALLINT," +
+ " string_field STRING," +
+ " varchar_field VARCHAR(50)," +
+ " timestamp_field TIMESTAMP" +
+ ") PARTITIONED BY (" +
+ " binary_part BINARY," +
+ " boolean_part BOOLEAN," +
+ " tinyint_part TINYINT," +
+ " decimal0_part DECIMAL," +
+ " decimal9_part DECIMAL(6, 2)," +
+ " decimal18_part DECIMAL(15, 5)," +
+ " decimal28_part DECIMAL(23, 1)," +
+ " decimal38_part DECIMAL(30, 3)," +
+ " double_part DOUBLE," +
+ " float_part FLOAT," +
+ " int_part INT," +
+ " bigint_part BIGINT," +
+ " smallint_part SMALLINT," +
+ " string_part STRING," +
+ " varchar_part VARCHAR(50)," +
+ " timestamp_part TIMESTAMP," +
+ " date_part DATE" +
+ ") STORED AS parquet "
+ );
+
+ executeQuery(hiveDriver, "INSERT OVERWRITE TABLE readtest_parquet " +
+ "PARTITION (" +
+ " binary_part='binary', " +
+ " boolean_part='true', " +
+ " tinyint_part='64', " +
+ " decimal0_part='36.9', " +
+ " decimal9_part='36.9', " +
+ " decimal18_part='3289379872.945645', " +
+ " decimal28_part='39579334534534.35345', " +
+ " decimal38_part='363945093845093890.9', " +
+ " double_part='8.345', " +
+ " float_part='4.67', " +
+ " int_part='123456', " +
+ " bigint_part='234235', " +
+ " smallint_part='3455', " +
+ " string_part='string', " +
+ " varchar_part='varchar', " +
+ " timestamp_part='2013-07-05 17:01:00', " +
+ " date_part='2013-07-05'" +
+ ") " +
+ " SELECT " +
+ " boolean_field," +
+ " tinyint_field," +
+ " decimal0_field," +
+ " decimal9_field," +
+ " decimal18_field," +
+ " decimal28_field," +
+ " decimal38_field," +
+ " double_field," +
+ " float_field," +
+ " int_field," +
+ " bigint_field," +
+ " smallint_field," +
+ " string_field," +
+ " varchar_field," +
+ " timestamp_field" +
+ " FROM readtest WHERE boolean_part = true");
+
+ // Add a second partition to table 'readtest_parquet' which contains the same values as the first partition except
+ // for boolean_part partition column
+ executeQuery(hiveDriver,
+ "ALTER TABLE readtest_parquet ADD PARTITION ( " +
+ " binary_part='binary', " +
+ " boolean_part='false', " +
+ " tinyint_part='64', " +
+ " decimal0_part='36.9', " +
+ " decimal9_part='36.9', " +
+ " decimal18_part='3289379872.945645', " +
+ " decimal28_part='39579334534534.35345', " +
+ " decimal38_part='363945093845093890.9', " +
+ " double_part='8.345', " +
+ " float_part='4.67', " +
+ " int_part='123456', " +
+ " bigint_part='234235', " +
+ " smallint_part='3455', " +
+ " string_part='string', " +
+ " varchar_part='varchar', " +
+ " timestamp_part='2013-07-05 17:01:00', " +
+ " date_part='2013-07-05')"
+ );
+
// create a Hive view to test how its metadata is populated in Drill's INFORMATION_SCHEMA
executeQuery(hiveDriver, "CREATE VIEW IF NOT EXISTS hiveview AS SELECT * FROM kv");
http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 66f9f03..d54a777 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -158,6 +158,12 @@ public interface ExecConstants {
public static String MONGO_READER_READ_NUMBERS_AS_DOUBLE = "store.mongo.read_numbers_as_double";
public static OptionValidator MONGO_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR = new BooleanValidator(MONGO_READER_READ_NUMBERS_AS_DOUBLE, false);
+ // TODO: We need to add a feature that enables storage plugins to add their own options. Currently we have to declare
+ // in core which is not right. Move this option and above two mongo plugin related options once we have the feature.
+ public static String HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS = "store.hive.optimize_scan_with_native_readers";
+ public static OptionValidator HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS_VALIDATOR =
+ new BooleanValidator(HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS, false);
+
public static final String SLICE_TARGET = "planner.slice_target";
public static final long SLICE_TARGET_DEFAULT = 100000l;
public static final OptionValidator SLICE_TARGET_OPTION = new PositiveLongValidator(SLICE_TARGET, Long.MAX_VALUE, SLICE_TARGET_DEFAULT);
http://git-wip-us.apache.org/repos/asf/drill/blob/f78ab841/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 5838bd1..c58bc08 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -93,6 +93,7 @@ public class SystemOptionManager extends BaseOptionManager {
ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL_VALIDATOR,
ExecConstants.MONGO_READER_ALL_TEXT_MODE_VALIDATOR,
ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR,
+ ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS_VALIDATOR,
ExecConstants.SLICE_TARGET_OPTION,
ExecConstants.AFFINITY_FACTOR,
ExecConstants.MAX_WIDTH_GLOBAL,