You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/07/09 12:17:27 UTC
[drill] 01/04: DRILL-6575: Add store.hive.conf.properties option to
allow set Hive properties at session level
This is an automated email from the ASF dual-hosted git repository.
arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 139f7156bcb2f6fef5b36f116c9c1b6095fc4b9c
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Thu Jul 5 20:48:37 2018 +0000
DRILL-6575: Add store.hive.conf.properties option to allow set Hive properties at session level
closes #1365
---
...ertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java | 6 +-
.../ConvertHiveParquetScanToDrillParquetScan.java | 21 ++--
.../hive/HiveDrillNativeParquetRowGroupScan.java | 17 +++-
.../store/hive/HiveDrillNativeParquetScan.java | 20 +++-
.../org/apache/drill/exec/store/hive/HiveScan.java | 105 ++++++++++----------
.../drill/exec/store/hive/HiveStoragePlugin.java | 57 ++++++++---
.../exec/store/hive/HiveStoragePluginConfig.java | 21 ++--
.../apache/drill/exec/store/hive/HiveSubScan.java | 22 +++--
.../drill/exec/store/hive/HiveUtilities.java | 108 ++++++++++++++++-----
.../main/resources/bootstrap-storage-plugins.json | 2 +-
.../exec/TestHiveDrillNativeParquetReader.java | 17 ++++
.../apache/drill/exec/hive/TestHiveStorage.java | 21 ++++
.../exec/hive/TestInfoSchemaOnHiveStorage.java | 2 +
.../exec/store/hive/HiveTestDataGenerator.java | 80 ++++++++-------
.../java/org/apache/drill/exec/ExecConstants.java | 3 +
.../org/apache/drill/exec/opt/BasicOptimizer.java | 53 +++++-----
.../drill/exec/planner/logical/DrillTable.java | 11 ++-
.../drill/exec/planner/sql/SqlConverter.java | 34 +++----
.../exec/server/options/SystemOptionManager.java | 43 ++++----
.../drill/exec/store/AbstractStoragePlugin.java | 12 +++
.../org/apache/drill/exec/store/StoragePlugin.java | 45 ++++++---
.../java-exec/src/main/resources/drill-module.conf | 4 +
22 files changed, 457 insertions(+), 247 deletions(-)
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
index 50fee9c..3bc33b3 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
@@ -88,7 +88,7 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi
HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry,
- hiveScan.getStoragePlugin().getHiveConf());
+ hiveScan.getHiveConf());
if (hiveMetadataProvider.getInputSplits(hiveReadEntry).isEmpty()) {
// table is empty, use original scan
return;
@@ -134,7 +134,7 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi
List<SchemaPath> hiveScanCols = hiveScanRel.getColumns().stream()
.map(colNameSchemaPath -> replaceOverriddenSchemaPath(parameters, colNameSchemaPath))
.collect(Collectors.toList());
- JsonTableGroupScan nariveMapRDBScan =
+ JsonTableGroupScan nativeMapRDBScan =
new JsonTableGroupScan(
hiveScan.getUserName(),
hiveScan.getStoragePlugin(),
@@ -155,7 +155,7 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi
hiveScanRel.getCluster(),
hiveScanRel.getTraitSet(),
hiveScanRel.getTable(),
- nariveMapRDBScan,
+ nativeMapRDBScan,
nativeScanRowType,
hiveScanCols);
}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
index 2a2f4fb..ea71157 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.planner.sql.logical;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -43,6 +41,8 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -88,7 +88,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
final Table hiveTable = hiveScan.getHiveReadEntry().getTable();
final HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
- final HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, hiveScan.getStoragePlugin().getHiveConf());
+ final HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, hiveScan.getHiveConf());
final List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits = hiveMetadataProvider.getInputSplits(hiveReadEntry);
if (logicalInputSplits.isEmpty()) {
@@ -123,7 +123,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
* Create mapping of Hive partition column to directory column mapping.
*/
private Map<String, String> getPartitionColMapping(final Table hiveTable, final String partitionColumnLabel) {
- final Map<String, String> partitionColMapping = Maps.newHashMap();
+ final Map<String, String> partitionColMapping = new HashMap<>();
int i = 0;
for (FieldSchema col : hiveTable.getPartitionKeys()) {
partitionColMapping.put(col.getName(), partitionColumnLabel+i);
@@ -143,8 +143,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
final RelDataTypeFactory typeFactory = hiveScanRel.getCluster().getTypeFactory();
final RelDataType varCharType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
- final List<String> nativeScanColNames = Lists.newArrayList();
- final List<RelDataType> nativeScanColTypes = Lists.newArrayList();
+ final List<String> nativeScanColNames = new ArrayList<>();
+ final List<RelDataType> nativeScanColTypes = new ArrayList<>();
for (RelDataTypeField field : hiveScanRel.getRowType().getFieldList()) {
final String dirColName = partitionColMapping.get(field.getName());
if (dirColName != null) { // partition column
@@ -161,8 +161,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
// Create the list of projected columns set in HiveScan. The order of this list may not be same as the order of
// columns in HiveScan row type. Note: If the HiveScan.getColumn() contains a '*', we just need to add it as it is,
// unlike above where we expanded the '*'. HiveScan and related (subscan) can handle '*'.
- final List<SchemaPath> nativeScanCols = Lists.newArrayList();
- for(SchemaPath colName : hiveScanRel.getColumns()) {
+ final List<SchemaPath> nativeScanCols = new ArrayList<>();
+ for (SchemaPath colName : hiveScanRel.getColumns()) {
final String partitionCol = partitionColMapping.get(colName.getRootSegmentPath());
if (partitionCol != null) {
nativeScanCols.add(SchemaPath.getSimplePath(partitionCol));
@@ -177,7 +177,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
hiveScan.getUserName(),
nativeScanCols,
hiveScan.getStoragePlugin(),
- logicalInputSplits);
+ logicalInputSplits,
+ hiveScan.getConfProperties());
return new DrillScanRel(
hiveScanRel.getCluster(),
@@ -194,7 +195,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
private DrillProjectRel createProjectRel(final DrillScanRel hiveScanRel,
final Map<String, String> partitionColMapping, final DrillScanRel nativeScanRel) {
- final List<RexNode> rexNodes = Lists.newArrayList();
+ final List<RexNode> rexNodes = new ArrayList<>();
final RexBuilder rb = hiveScanRel.getCluster().getRexBuilder();
final RelDataType hiveScanRowType = hiveScanRel.getRowType();
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
index e227015..d334ec8 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapred.JobConf;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
@JsonTypeName("hive-drill-native-parquet-row-group-scan")
public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupScan {
@@ -45,6 +46,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
private final HiveStoragePlugin hiveStoragePlugin;
private final HiveStoragePluginConfig hiveStoragePluginConfig;
private final HivePartitionHolder hivePartitionHolder;
+ private final Map<String, String> confProperties;
@JsonCreator
public HiveDrillNativeParquetRowGroupScan(@JacksonInject StoragePluginRegistry registry,
@@ -53,12 +55,14 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
@JsonProperty("rowGroupReadEntries") List<RowGroupReadEntry> rowGroupReadEntries,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder,
+ @JsonProperty("confProperties") Map<String, String> confProperties,
@JsonProperty("filter") LogicalExpression filter) throws ExecutionSetupException {
this(userName,
(HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig),
rowGroupReadEntries,
columns,
hivePartitionHolder,
+ confProperties,
filter);
}
@@ -67,11 +71,13 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
List<RowGroupReadEntry> rowGroupReadEntries,
List<SchemaPath> columns,
HivePartitionHolder hivePartitionHolder,
+ Map<String, String> confProperties,
LogicalExpression filter) {
super(userName, rowGroupReadEntries, columns, filter);
this.hiveStoragePlugin = Preconditions.checkNotNull(hiveStoragePlugin, "Could not find format config for the given configuration");
this.hiveStoragePluginConfig = hiveStoragePlugin.getConfig();
this.hivePartitionHolder = hivePartitionHolder;
+ this.confProperties = confProperties;
}
@JsonProperty
@@ -84,6 +90,11 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
return hivePartitionHolder;
}
+ @JsonProperty
+ public Map<String, String> getConfProperties() {
+ return confProperties;
+ }
+
@JsonIgnore
public HiveStoragePlugin getHiveStoragePlugin() {
return hiveStoragePlugin;
@@ -92,7 +103,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
- return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, filter);
+ return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, confProperties, filter);
}
@Override
@@ -102,7 +113,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
@Override
public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
- return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, filter);
+ return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, confProperties, filter);
}
@Override
@@ -114,7 +125,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
public Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) throws IOException {
Path path = new Path(rowGroupReadEntry.getPath()).getParent();
return new ProjectionPusher().pushProjectionsAndFilters(
- new JobConf(hiveStoragePlugin.getHiveConf()),
+ new JobConf(HiveUtilities.generateHiveConf(hiveStoragePlugin.getHiveConf(), confProperties)),
path.getParent());
}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 03a80d3..a973fa1 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -62,7 +62,8 @@ import java.util.Map;
public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
private final HiveStoragePlugin hiveStoragePlugin;
- private HivePartitionHolder hivePartitionHolder;
+ private final HivePartitionHolder hivePartitionHolder;
+ private final Map<String, String> confProperties;
@JsonCreator
public HiveDrillNativeParquetScan(@JacksonInject StoragePluginRegistry engineRegistry,
@@ -71,10 +72,12 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("entries") List<ReadEntryWithPath> entries,
@JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder,
+ @JsonProperty("confProperties") Map<String, String> confProperties,
@JsonProperty("filter") LogicalExpression filter) throws IOException, ExecutionSetupException {
super(ImpersonationUtil.resolveUserName(userName), columns, entries, filter);
this.hiveStoragePlugin = (HiveStoragePlugin) engineRegistry.getPlugin(hiveStoragePluginConfig);
this.hivePartitionHolder = hivePartitionHolder;
+ this.confProperties = confProperties;
init();
}
@@ -82,19 +85,22 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
public HiveDrillNativeParquetScan(String userName,
List<SchemaPath> columns,
HiveStoragePlugin hiveStoragePlugin,
- List<LogicalInputSplit> logicalInputSplits) throws IOException {
- this(userName, columns, hiveStoragePlugin, logicalInputSplits, ValueExpressions.BooleanExpression.TRUE);
+ List<LogicalInputSplit> logicalInputSplits,
+ Map<String, String> confProperties) throws IOException {
+ this(userName, columns, hiveStoragePlugin, logicalInputSplits, confProperties, ValueExpressions.BooleanExpression.TRUE);
}
public HiveDrillNativeParquetScan(String userName,
List<SchemaPath> columns,
HiveStoragePlugin hiveStoragePlugin,
List<LogicalInputSplit> logicalInputSplits,
+ Map<String, String> confProperties,
LogicalExpression filter) throws IOException {
super(userName, columns, new ArrayList<>(), filter);
this.hiveStoragePlugin = hiveStoragePlugin;
this.hivePartitionHolder = new HivePartitionHolder();
+ this.confProperties = confProperties;
for (LogicalInputSplit logicalInputSplit : logicalInputSplits) {
Iterator<InputSplit> iterator = logicalInputSplit.getInputSplits().iterator();
@@ -122,6 +128,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
super(that);
this.hiveStoragePlugin = that.hiveStoragePlugin;
this.hivePartitionHolder = that.hivePartitionHolder;
+ this.confProperties = that.confProperties;
}
@JsonProperty
@@ -134,6 +141,11 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
return hivePartitionHolder;
}
+ @JsonProperty
+ public Map<String, String> getConfProperties() {
+ return confProperties;
+ }
+
@Override
public SubScan getSpecificScan(int minorFragmentId) {
List<RowGroupReadEntry> readEntries = getReadEntries(minorFragmentId);
@@ -142,7 +154,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
List<String> values = hivePartitionHolder.get(readEntry.getPath());
subPartitionHolder.add(readEntry.getPath(), values);
}
- return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, readEntries, columns, subPartitionHolder, filter);
+ return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, readEntries, columns, subPartitionHolder, confProperties, filter);
}
@Override
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index 11d47f3..d631740 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -50,8 +50,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import static org.apache.drill.exec.store.hive.DrillHiveMetaStoreClient.createPartitionWithSpecColumns;
@@ -64,6 +62,7 @@ public class HiveScan extends AbstractGroupScan {
private final HiveStoragePlugin hiveStoragePlugin;
private final HiveReadEntry hiveReadEntry;
private final HiveMetadataProvider metadataProvider;
+ private final Map<String, String> confProperties;
private List<List<LogicalInputSplit>> mappings;
private List<LogicalInputSplit> inputSplits;
@@ -75,22 +74,24 @@ public class HiveScan extends AbstractGroupScan {
@JsonProperty("hiveReadEntry") final HiveReadEntry hiveReadEntry,
@JsonProperty("hiveStoragePluginConfig") final HiveStoragePluginConfig hiveStoragePluginConfig,
@JsonProperty("columns") final List<SchemaPath> columns,
+ @JsonProperty("confProperties") final Map<String, String> confProperties,
@JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
this(userName,
hiveReadEntry,
(HiveStoragePlugin) pluginRegistry.getPlugin(hiveStoragePluginConfig),
columns,
- null);
+ null, confProperties);
}
public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, final HiveStoragePlugin hiveStoragePlugin,
- final List<SchemaPath> columns, final HiveMetadataProvider metadataProvider) throws ExecutionSetupException {
+ final List<SchemaPath> columns, final HiveMetadataProvider metadataProvider, final Map<String, String> confProperties) throws ExecutionSetupException {
super(userName);
this.hiveReadEntry = hiveReadEntry;
this.columns = columns;
this.hiveStoragePlugin = hiveStoragePlugin;
+ this.confProperties = confProperties;
if (metadataProvider == null) {
- this.metadataProvider = new HiveMetadataProvider(userName, hiveReadEntry, hiveStoragePlugin.getHiveConf());
+ this.metadataProvider = new HiveMetadataProvider(userName, hiveReadEntry, getHiveConf());
} else {
this.metadataProvider = metadataProvider;
}
@@ -102,10 +103,11 @@ public class HiveScan extends AbstractGroupScan {
this.hiveReadEntry = that.hiveReadEntry;
this.hiveStoragePlugin = that.hiveStoragePlugin;
this.metadataProvider = that.metadataProvider;
+ this.confProperties = that.confProperties;
}
public HiveScan clone(final HiveReadEntry hiveReadEntry) throws ExecutionSetupException {
- return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin, columns, metadataProvider);
+ return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin, columns, metadataProvider, confProperties);
}
@JsonProperty
@@ -123,29 +125,37 @@ public class HiveScan extends AbstractGroupScan {
return columns;
}
+ @JsonProperty
+ public Map<String, String> getConfProperties() {
+ return confProperties;
+ }
+
@JsonIgnore
public HiveStoragePlugin getStoragePlugin() {
return hiveStoragePlugin;
}
- protected HiveMetadataProvider getMetadataProvider() {
- return metadataProvider;
+ @JsonIgnore
+ public HiveConf getHiveConf() {
+ return HiveUtilities.generateHiveConf(hiveStoragePlugin.getHiveConf(), confProperties);
}
- private List<LogicalInputSplit> getInputSplits() {
- if (inputSplits == null) {
- inputSplits = metadataProvider.getInputSplits(hiveReadEntry);
- }
-
- return inputSplits;
+ @JsonIgnore
+ public boolean isNativeReader() {
+ return false;
}
+ @Override
+ public boolean supportsPartitionFilterPushdown() {
+ List<FieldSchema> partitionKeys = hiveReadEntry.getTable().getPartitionKeys();
+ return !(partitionKeys == null || partitionKeys.size() == 0);
+ }
@Override
public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints) {
mappings = new ArrayList<>();
for (int i = 0; i < endpoints.size(); i++) {
- mappings.add(new ArrayList<LogicalInputSplit>());
+ mappings.add(new ArrayList<>());
}
final int count = endpoints.size();
final List<LogicalInputSplit> inputSplits = getInputSplits();
@@ -158,9 +168,9 @@ public class HiveScan extends AbstractGroupScan {
public SubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException {
try {
final List<LogicalInputSplit> splits = mappings.get(minorFragmentId);
- List<HivePartitionWrapper> parts = Lists.newArrayList();
- final List<List<String>> encodedInputSplits = Lists.newArrayList();
- final List<String> splitTypes = Lists.newArrayList();
+ List<HivePartitionWrapper> parts = new ArrayList<>();
+ final List<List<String>> encodedInputSplits = new ArrayList<>();
+ final List<String> splitTypes = new ArrayList<>();
for (final LogicalInputSplit split : splits) {
final Partition splitPartition = split.getPartition();
if (splitPartition != null) {
@@ -176,7 +186,7 @@ public class HiveScan extends AbstractGroupScan {
}
final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.getTableWrapper(), parts);
- return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns, hiveStoragePlugin);
+ return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns, hiveStoragePlugin, confProperties);
} catch (IOException | ReflectiveOperationException e) {
throw new ExecutionSetupException(e);
}
@@ -192,7 +202,7 @@ public class HiveScan extends AbstractGroupScan {
final Map<String, DrillbitEndpoint> endpointMap = new HashMap<>();
for (final DrillbitEndpoint endpoint : hiveStoragePlugin.getContext().getBits()) {
endpointMap.put(endpoint.getAddress(), endpoint);
- logger.debug("endpoing address: {}", endpoint.getAddress());
+ logger.debug("Endpoint address: {}", endpoint.getAddress());
}
final Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>();
try {
@@ -204,7 +214,7 @@ public class HiveScan extends AbstractGroupScan {
for (final LogicalInputSplit split : inputSplits) {
final float affinity = ((float) Math.max(1, split.getLength())) / totalSize;
for (final String loc : split.getLocations()) {
- logger.debug("split location: {}", loc);
+ logger.debug("Split location: {}", loc);
final DrillbitEndpoint endpoint = endpointMap.get(loc);
if (endpoint != null) {
if (affinityMap.containsKey(endpoint)) {
@@ -218,13 +228,8 @@ public class HiveScan extends AbstractGroupScan {
} catch (final IOException e) {
throw new DrillRuntimeException(e);
}
- for (final DrillbitEndpoint ep : affinityMap.keySet()) {
- Preconditions.checkNotNull(ep);
- }
- for (final EndpointAffinity a : affinityMap.values()) {
- Preconditions.checkNotNull(a.getEndpoint());
- }
- return Lists.newArrayList(affinityMap.values());
+
+ return new ArrayList<>(affinityMap.values());
}
@Override
@@ -243,21 +248,8 @@ public class HiveScan extends AbstractGroupScan {
}
}
- protected int getSerDeOverheadFactor() {
- final int projectedColumnCount;
- if (Utilities.isStarQuery(columns)) {
- Table hiveTable = hiveReadEntry.getTable();
- projectedColumnCount = hiveTable.getSd().getColsSize() + hiveTable.getPartitionKeysSize();
- } else {
- // In cost estimation, # of project columns should be >= 1, even for skipAll query.
- projectedColumnCount = Math.max(columns.size(), 1);
- }
-
- return projectedColumnCount * HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN;
- }
-
@Override
- public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) throws ExecutionSetupException {
+ public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) {
return new HiveScan(this);
}
@@ -275,6 +267,7 @@ public class HiveScan extends AbstractGroupScan {
+ ", numPartitions=" + numPartitions
+ ", partitions= " + partitions
+ ", inputDirectories=" + metadataProvider.getInputDirectories(hiveReadEntry)
+ + ", confProperties=" + confProperties
+ "]";
}
@@ -290,22 +283,24 @@ public class HiveScan extends AbstractGroupScan {
return true;
}
- // Return true if the current table is partitioned false otherwise
- public boolean supportsPartitionFilterPushdown() {
- final List<FieldSchema> partitionKeys = hiveReadEntry.getTable().getPartitionKeys();
- if (partitionKeys == null || partitionKeys.size() == 0) {
- return false;
+ private List<LogicalInputSplit> getInputSplits() {
+ if (inputSplits == null) {
+ inputSplits = metadataProvider.getInputSplits(hiveReadEntry);
}
- return true;
- }
- @JsonIgnore
- public HiveConf getHiveConf() {
- return hiveStoragePlugin.getHiveConf();
+ return inputSplits;
}
- @JsonIgnore
- public boolean isNativeReader() {
- return false;
+ private int getSerDeOverheadFactor() {
+ final int projectedColumnCount;
+ if (Utilities.isStarQuery(columns)) {
+ Table hiveTable = hiveReadEntry.getTable();
+ projectedColumnCount = hiveTable.getSd().getColsSize() + hiveTable.getPartitionKeysSize();
+ } else {
+ // In cost estimation, # of project columns should be >= 1, even for skipAll query.
+ projectedColumnCount = Math.max(columns.size(), 1);
+ }
+
+ return projectedColumnCount * HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN;
}
}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index ced8b01..adf1348 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -18,27 +18,34 @@
package org.apache.drill.exec.store.hive;
import java.io.IOException;
+import java.io.StringReader;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
+import java.util.Properties;
import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.schema.Schema.TableType;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.planner.sql.logical.ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan;
import org.apache.drill.exec.planner.sql.logical.ConvertHiveParquetScanToDrillParquetScan;
import org.apache.drill.exec.planner.sql.logical.HivePushPartitionFilterIntoScan;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.SessionOptionManager;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
@@ -53,17 +60,16 @@ import org.apache.thrift.transport.TTransportException;
public class HiveStoragePlugin extends AbstractStoragePlugin {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveStoragePlugin.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveStoragePlugin.class);
private final HiveStoragePluginConfig config;
private HiveSchemaFactory schemaFactory;
private final HiveConf hiveConf;
- public HiveStoragePlugin(HiveStoragePluginConfig config, DrillbitContext context, String name)
- throws ExecutionSetupException {
+ public HiveStoragePlugin(HiveStoragePluginConfig config, DrillbitContext context, String name) throws ExecutionSetupException {
super(context, name);
this.config = config;
- this.hiveConf = createHiveConf(config.getHiveConfigOverride());
+ this.hiveConf = HiveUtilities.generateHiveConf(config.getConfigProps());
this.schemaFactory = new HiveSchemaFactory(this, name, hiveConf);
}
@@ -76,7 +82,17 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
}
@Override
+ public HiveScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options) throws IOException {
+ return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, options);
+ }
+
+ @Override
public HiveScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException {
+ return getPhysicalScan(userName, selection, columns, null);
+ }
+
+ @Override
+ public HiveScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options) throws IOException {
HiveReadEntry hiveReadEntry = selection.getListWith(new ObjectMapper(), new TypeReference<HiveReadEntry>(){});
try {
if (hiveReadEntry.getJdbcTableType() == TableType.VIEW) {
@@ -84,7 +100,26 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
"Querying views created in Hive from Drill is not supported in current version.");
}
- return new HiveScan(userName, hiveReadEntry, this, columns, null);
+ Map<String, String> confProperties = new HashMap<>();
+ if (options != null) {
+ String value = StringEscapeUtils.unescapeJava(options.getString(ExecConstants.HIVE_CONF_PROPERTIES));
+ logger.trace("[{}] is set to {}.", ExecConstants.HIVE_CONF_PROPERTIES, value);
+ try {
+ Properties properties = new Properties();
+ properties.load(new StringReader(value));
+ confProperties =
+ properties.stringPropertyNames().stream()
+ .collect(
+ Collectors.toMap(
+ Function.identity(),
+ properties::getProperty,
+ (o, n) -> n));
+ } catch (IOException e) {
+ logger.warn("Unable to parse Hive conf properties {}, ignoring them.", value);
+ }
+ }
+
+ return new HiveScan(userName, hiveReadEntry, this, columns, null, confProperties);
} catch (ExecutionSetupException e) {
throw new IOException(e);
}
@@ -181,14 +216,4 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
return ruleBuilder.build();
}
- private static HiveConf createHiveConf(final Map<String, String> hiveConfigOverride) {
- final HiveConf hiveConf = new HiveConf();
- for(Entry<String, String> config : hiveConfigOverride.entrySet()) {
- final String key = config.getKey();
- final String value = config.getValue();
- hiveConf.set(key, value);
- logger.trace("HiveConfig Override {}={}", key, value);
- }
- return hiveConf;
- }
}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
index b6f15c8..d812468 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
@@ -19,28 +19,31 @@ package org.apache.drill.exec.store.hive;
import java.util.Map;
+import com.fasterxml.jackson.annotation.JsonAlias;
import org.apache.drill.common.logical.StoragePluginConfigBase;
import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName(HiveStoragePluginConfig.NAME)
public class HiveStoragePluginConfig extends StoragePluginConfigBase {
- @JsonProperty
- public Map<String, String> configProps;
public static final String NAME = "hive";
- @JsonIgnore
- public Map<String, String> getHiveConfigOverride() {
- return configProps;
- }
+ private final Map<String, String> configProps;
@JsonCreator
- public HiveStoragePluginConfig(@JsonProperty("config") Map<String, String> props) {
- this.configProps = props;
+ public HiveStoragePluginConfig(@JsonProperty("configProps")
+ // previously two names were allowed due to incorrectly written ser / der logic
+ // allowing to use both during deserialization for backward compatibility
+ @JsonAlias("config") Map<String, String> configProps) {
+ this.configProps = configProps;
+ }
+
+ @JsonProperty
+ public Map<String, String> getConfigProps() {
+ return configProps;
}
@Override
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
index 8ca8647..0acec2d 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.google.common.collect.ImmutableSet;
@@ -55,6 +56,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
private final HiveTableWithColumnCache table;
private final List<HivePartition> partitions;
private final List<SchemaPath> columns;
+ private final Map<String, String> confProperties;
@JsonCreator
public HiveSubScan(@JacksonInject StoragePluginRegistry registry,
@@ -63,22 +65,24 @@ public class HiveSubScan extends AbstractBase implements SubScan {
@JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry,
@JsonProperty("splitClasses") List<String> splitClasses,
@JsonProperty("columns") List<SchemaPath> columns,
- @JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig)
+ @JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig,
+ @JsonProperty("confProperties") Map<String, String> confProperties)
throws IOException, ExecutionSetupException, ReflectiveOperationException {
this(userName,
splits,
hiveReadEntry,
splitClasses,
columns,
- (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig));
+ (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig), confProperties);
}
public HiveSubScan(final String userName,
final List<List<String>> splits,
final HiveReadEntry hiveReadEntry,
- final List<String> splitClasses,
+ final List<String> splitClasses,
final List<SchemaPath> columns,
- final HiveStoragePlugin hiveStoragePlugin)
+ final HiveStoragePlugin hiveStoragePlugin,
+ final Map<String, String> confProperties)
throws IOException, ReflectiveOperationException {
super(userName);
this.hiveReadEntry = hiveReadEntry;
@@ -88,6 +92,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
this.splitClasses = splitClasses;
this.columns = columns;
this.hiveStoragePlugin = hiveStoragePlugin;
+ this.confProperties = confProperties;
for (int i = 0; i < splits.size(); i++) {
inputSplits.add(deserializeInputSplit(splits.get(i), splitClasses.get(i)));
@@ -119,6 +124,11 @@ public class HiveSubScan extends AbstractBase implements SubScan {
return hiveStoragePlugin.getConfig();
}
+ @JsonProperty
+ public Map<String, String> getConfProperties() {
+ return confProperties;
+ }
+
@JsonIgnore
public HiveTableWithColumnCache getTable() {
return table;
@@ -141,7 +151,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
@JsonIgnore
public HiveConf getHiveConf() {
- return hiveStoragePlugin.getHiveConf();
+ return HiveUtilities.generateHiveConf(hiveStoragePlugin.getHiveConf(), confProperties);
}
@Override
@@ -152,7 +162,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
try {
- return new HiveSubScan(getUserName(), splits, hiveReadEntry, splitClasses, columns, hiveStoragePlugin);
+ return new HiveSubScan(getUserName(), splits, hiveReadEntry, splitClasses, columns, hiveStoragePlugin, confProperties);
} catch (IOException | ReflectiveOperationException e) {
throw new ExecutionSetupException(e);
}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
index c8efb65..6fc567e 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
@@ -93,7 +93,14 @@ import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_
public class HiveUtilities {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveUtilities.class);
- /** Partition value is received in string format. Convert it into appropriate object based on the type. */
+ /**
+ * Partition value is received in string format. Convert it into appropriate object based on the type.
+ *
+ * @param typeInfo type info
+ * @param value partition values
+ * @param defaultPartitionValue default partition value
+ * @return converted object
+ */
public static Object convertPartitionType(TypeInfo typeInfo, String value, final String defaultPartitionValue) {
if (typeInfo.getCategory() != Category.PRIMITIVE) {
// In Hive only primitive types are allowed as partition column types.
@@ -147,6 +154,15 @@ public class HiveUtilities {
return null;
}
+ /**
+ * Populates vector with given value based on its type.
+ *
+ * @param vector vector instance
+ * @param managedBuffer Drill duffer
+ * @param val value
+ * @param start start position
+ * @param end end position
+ */
public static void populateVector(final ValueVector vector, final DrillBuf managedBuffer, final Object val,
final int start, final int end) {
TypeProtos.MinorType type = vector.getField().getType().getMinorType();
@@ -307,6 +323,13 @@ public class HiveUtilities {
}
}
+ /**
+ * Obtains major type from given type info holder.
+ *
+ * @param typeInfo type info holder
+ * @param options session options
+ * @return appropriate major type, null otherwise. For some types may throw unsupported exception.
+ */
public static MajorType getMajorTypeFromHiveTypeInfo(final TypeInfo typeInfo, final OptionSet options) {
switch (typeInfo.getCategory()) {
case PRIMITIVE: {
@@ -343,8 +366,14 @@ public class HiveUtilities {
return null;
}
- public static TypeProtos.MinorType getMinorTypeFromHivePrimitiveTypeInfo(PrimitiveTypeInfo primitiveTypeInfo,
- OptionSet options) {
+ /**
+ * Obtains minor type from given primitive type info holder.
+ *
+ * @param primitiveTypeInfo primitive type info holder
+ * @param options session options
+ * @return appropriate minor type, otherwise throws unsupported type exception
+ */
+ public static TypeProtos.MinorType getMinorTypeFromHivePrimitiveTypeInfo(PrimitiveTypeInfo primitiveTypeInfo, OptionSet options) {
switch(primitiveTypeInfo.getPrimitiveCategory()) {
case BINARY:
return TypeProtos.MinorType.VARBINARY;
@@ -392,10 +421,8 @@ public class HiveUtilities {
* @param job {@link JobConf} instance needed incase the table is StorageHandler based table.
* @param sd {@link StorageDescriptor} instance of currently reading partition or table (for non-partitioned tables).
* @param table Table object
- * @throws Exception
*/
- public static Class<? extends InputFormat<?, ?>> getInputFormatClass(final JobConf job, final StorageDescriptor sd,
- final Table table) throws Exception {
+ public static Class<? extends InputFormat<?, ?>> getInputFormatClass(final JobConf job, final StorageDescriptor sd, final Table table) throws Exception {
final String inputFormatName = sd.getInputFormat();
if (Strings.isNullOrEmpty(inputFormatName)) {
final String storageHandlerClass = table.getParameters().get(META_TABLE_STORAGE);
@@ -426,26 +453,23 @@ public class HiveUtilities {
}
/**
- * Wrapper around {@link MetaStoreUtils#getPartitionMetadata(Partition, Table)} which also adds parameters from table
- * to properties returned by {@link MetaStoreUtils#getPartitionMetadata(Partition, Table)}.
+ * Wrapper around {@link MetaStoreUtils#getPartitionMetadata(org.apache.hadoop.hive.metastore.api.Partition, Table)}
+ * which also adds parameters from table to properties returned by that method.
*
* @param partition the source of partition level parameters
* @param table the source of table level parameters
* @return properties
*/
public static Properties getPartitionMetadata(final HivePartition partition, final HiveTableWithColumnCache table) {
- final Properties properties;
restoreColumns(table, partition);
- properties = MetaStoreUtils.getPartitionMetadata(partition, table);
+ Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table);
// SerDe expects properties from Table, but above call doesn't add Table properties.
// Include Table properties in final list in order to not to break SerDes that depend on
// Table properties. For example AvroSerDe gets the schema from properties (passed as second argument)
- for (Map.Entry<String, String> entry : table.getParameters().entrySet()) {
- if (entry.getKey() != null && entry.getKey() != null) {
- properties.put(entry.getKey(), entry.getValue());
- }
- }
+ table.getParameters().entrySet().stream()
+ .filter(e -> e.getKey() != null && e.getValue() != null)
+ .forEach(e -> properties.put(e.getKey(), e.getValue()));
return properties;
}
@@ -453,8 +477,8 @@ public class HiveUtilities {
/**
* Sets columns from table cache to table and partition.
*
+ * @param table the source of column lists cache
* @param partition partition which will set column list
- * @param table the source of column lists cache
*/
public static void restoreColumns(HiveTableWithColumnCache table, HivePartition partition) {
// exactly the same column lists for partitions or table
@@ -471,6 +495,9 @@ public class HiveUtilities {
* Wrapper around {@link MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, Map, String, String, List)}
* which also sets columns from table cache to table and returns properties returned by
* {@link MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, Map, String, String, List)}.
+ *
+ * @param table Hive table with cached columns
+ * @return Hive table metadata
*/
public static Properties getTableMetadata(HiveTableWithColumnCache table) {
restoreColumns(table, null);
@@ -478,13 +505,18 @@ public class HiveUtilities {
table.getDbName(), table.getTableName(), table.getPartitionKeys());
}
+ /**
+ * Generates unsupported types exception message with list of supported types
+ * and throws user exception.
+ *
+ * @param unsupportedType unsupported type
+ */
public static void throwUnsupportedHiveDataTypeError(String unsupportedType) {
- StringBuilder errMsg = new StringBuilder();
- errMsg.append(String.format("Unsupported Hive data type %s. ", unsupportedType));
- errMsg.append(System.getProperty("line.separator"));
- errMsg.append("Following Hive data types are supported in Drill for querying: ");
- errMsg.append(
- "BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DATE, TIMESTAMP, BINARY, DECIMAL, STRING, VARCHAR and CHAR");
+ StringBuilder errMsg = new StringBuilder()
+ .append("Unsupported Hive data type ").append(unsupportedType).append(". ")
+ .append(System.lineSeparator())
+ .append("Following Hive data types are supported in Drill for querying: ")
+ .append("BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DATE, TIMESTAMP, BINARY, DECIMAL, STRING, VARCHAR and CHAR");
throw UserException.unsupportedError()
.message(errMsg.toString())
@@ -633,7 +665,7 @@ public class HiveUtilities {
}
/**
- * Get the input format from given {@link StorageDescriptor}
+ * Get the input format from given {@link StorageDescriptor}.
*
* @param properties table properties
* @param hiveReadEntry hive read entry
@@ -681,5 +713,35 @@ public class HiveUtilities {
}
return false;
}
+
+ /**
+ * Creates HiveConf based on given list of configuration properties.
+ *
+ * @param properties config properties
+ * @return instance of HiveConf
+ */
+ public static HiveConf generateHiveConf(Map<String, String> properties) {
+ logger.trace("Override HiveConf with the following properties {}", properties);
+ HiveConf hiveConf = new HiveConf();
+ properties.forEach(hiveConf::set);
+ return hiveConf;
+ }
+
+ /**
+ * Creates HiveConf based on properties in given HiveConf and configuration properties.
+ *
+ * @param hiveConf hive conf
+ * @param properties config properties
+ * @return instance of HiveConf
+ */
+ public static HiveConf generateHiveConf(HiveConf hiveConf, Map<String, String> properties) {
+ Properties changedProperties = hiveConf.getChangedProperties();
+ changedProperties.putAll(properties);
+ HiveConf newHiveConf = new HiveConf();
+ changedProperties.stringPropertyNames()
+ .forEach(name -> newHiveConf.set(name, changedProperties.getProperty(name)));
+ return newHiveConf;
+ }
+
}
diff --git a/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json
index d06220f..018189c 100644
--- a/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json
+++ b/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json
@@ -2,7 +2,7 @@
"storage":{
hive : {
type:"hive",
- config : {
+ configProps : {
"hive.metastore.uris" : "",
"javax.jdo.option.ConnectionURL" : "jdbc:derby:;databaseName=../sample-data/drill_hive_db;create=true",
"hive.metastore.warehouse.dir" : "/tmp/drill_hive_wh",
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
index 556deb2..ea8d5df 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
@@ -152,6 +152,12 @@ public class TestHiveDrillNativeParquetReader extends HiveTestBase {
// checks only group scan
PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.kv_native");
PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.kv_native_ext");
+ try {
+ alterSession(ExecConstants.HIVE_CONF_PROPERTIES, "hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true");
+ PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.sub_dir_table");
+ } finally {
+ resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES);
+ }
}
@Test
@@ -243,4 +249,15 @@ public class TestHiveDrillNativeParquetReader extends HiveTestBase {
testPlanMatchingPatterns(query, new String[] {"HiveScan"}, new String[]{"HiveDrillNativeParquetScan"});
}
+ @Test
+ public void testHiveConfPropertiesAtSessionLevel() throws Exception {
+ String query = "select * from hive.sub_dir_table";
+ try {
+ alterSession(ExecConstants.HIVE_CONF_PROPERTIES, "hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true");
+ test(query);
+ } finally {
+ resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES);
+ }
+ }
+
}
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
index 25393e7..94f39b8 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
@@ -412,6 +412,27 @@ public class TestHiveStorage extends HiveTestBase {
public void testPhysicalPlanSubmission() throws Exception {
PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.kv");
PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.readtest");
+ try {
+ alterSession(ExecConstants.HIVE_CONF_PROPERTIES, "hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true");
+ PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.sub_dir_table");
+ } finally {
+ resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES);
+ }
+ }
+
+ @Test
+ public void testHiveConfPropertiesAtSessionLevel() throws Exception {
+ String query = "select * from hive.sub_dir_table";
+ try {
+ alterSession(ExecConstants.HIVE_CONF_PROPERTIES, "hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true");
+ test(query);
+ } finally {
+ resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES);
+ }
+
+ thrown.expect(UserRemoteException.class);
+ thrown.expectMessage(containsString("IOException: Not a file"));
+ test(query);
}
private void verifyColumnsMetadata(List<UserProtos.ResultColumnMetadata> columnsList, Map<String, Integer> expectedResult) {
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
index 80da976..c5c0d48 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
@@ -50,6 +50,7 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase {
.baselineValues("hive.default", "partition_with_few_schemas")
.baselineValues("hive.default", "kv_native")
.baselineValues("hive.default", "kv_native_ext")
+ .baselineValues("hive.default", "sub_dir_table")
.go();
testBuilder()
@@ -254,6 +255,7 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase {
.baselineValues("DRILL", "hive.default", "simple_json", "TABLE")
.baselineValues("DRILL", "hive.default", "kv_native", "TABLE")
.baselineValues("DRILL", "hive.default", "kv_native_ext", "TABLE")
+ .baselineValues("DRILL", "hive.default", "sub_dir_table", "TABLE")
.baselineValues("DRILL", "hive.skipper", "kv_text_small", "TABLE")
.baselineValues("DRILL", "hive.skipper", "kv_text_large", "TABLE")
.baselineValues("DRILL", "hive.skipper", "kv_incorrect_skip_header", "TABLE")
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index f206999..074cb3b 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -25,6 +25,7 @@ import java.nio.file.Paths;
import java.nio.file.attribute.PosixFilePermission;
import java.sql.Date;
import java.sql.Timestamp;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -42,7 +43,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
-import com.google.common.collect.Maps;
import org.apache.hadoop.hive.serde.serdeConstants;
import static org.apache.drill.exec.hive.HiveTestUtilities.executeQuery;
@@ -80,7 +80,7 @@ public class HiveTestDataGenerator {
this.whDir = whDir;
this.dirTestWatcher = dirTestWatcher;
- config = Maps.newHashMap();
+ config = new HashMap<>();
config.put(ConfVars.METASTOREURIS.toString(), "");
config.put("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s;create=true", dbDir));
config.put("hive.metastore.warehouse.dir", whDir);
@@ -89,7 +89,9 @@ public class HiveTestDataGenerator {
/**
* Add Hive test storage plugin to the given plugin registry.
- * @throws Exception
+ *
+ * @param pluginRegistry storage plugin registry
+ * @throws Exception in case if unable to update Hive storage plugin
*/
public void addHiveTestPlugin(final StoragePluginRegistry pluginRegistry) throws Exception {
HiveStoragePluginConfig pluginConfig = new HiveStoragePluginConfig(config);
@@ -101,7 +103,8 @@ public class HiveTestDataGenerator {
/**
* Update the current HiveStoragePlugin in given plugin registry with given <i>configOverride</i>.
*
- * @param configOverride
+ * @param pluginRegistry storage plugin registry
+ * @param configOverride config properties to be overridden
* @throws DrillException if fails to update or no Hive plugin currently exists in given plugin registry.
*/
public void updatePluginConfig(final StoragePluginRegistry pluginRegistry, Map<String, String> configOverride)
@@ -113,7 +116,7 @@ public class HiveTestDataGenerator {
}
HiveStoragePluginConfig newPluginConfig = storagePlugin.getConfig();
- newPluginConfig.getHiveConfigOverride().putAll(configOverride);
+ newPluginConfig.getConfigProps().putAll(configOverride);
pluginRegistry.createOrUpdate(HIVE_TEST_PLUGIN_NAME, newPluginConfig, true);
}
@@ -344,7 +347,7 @@ public class HiveTestDataGenerator {
"charType CHAR(10))"
);
- /**
+ /*
* Create a PARQUET table with all supported types.
*/
executeQuery(hiveDriver,
@@ -542,6 +545,8 @@ public class HiveTestDataGenerator {
createTestDataForDrillNativeParquetReaderTests(hiveDriver);
+ createSubDirTable(hiveDriver, testDataFile);
+
ss.close();
}
@@ -594,56 +599,61 @@ public class HiveTestDataGenerator {
"location '%s'", thirdPartition));
}
+ private void createSubDirTable(Driver hiveDriver, String testDataFile) {
+ String tableName = "sub_dir_table";
+ dirTestWatcher.copyResourceToRoot(Paths.get(testDataFile), Paths.get(tableName, "sub_dir", "data.txt"));
+
+ String tableLocation = Paths.get(dirTestWatcher.getRootDir().toURI().getPath(), tableName).toUri().getPath();
+
+ String tableDDL = String.format("create external table sub_dir_table (key int, value string) " +
+ "row format delimited fields terminated by ',' stored as textfile location '%s'", tableLocation);
+ executeQuery(hiveDriver, tableDDL);
+ }
+
private File getTempFile() throws Exception {
return java.nio.file.Files.createTempFile("drill-hive-test", ".txt").toFile();
}
private String generateTestDataFile() throws Exception {
- final File file = getTempFile();
- PrintWriter printWriter = new PrintWriter(file);
- for (int i=1; i<=5; i++) {
- printWriter.println (String.format("%d, key_%d", i, i));
+ File file = getTempFile();
+ try (PrintWriter printWriter = new PrintWriter(file)) {
+ for (int i = 1; i <= 5; i++) {
+ printWriter.println(String.format("%d, key_%d", i, i));
+ }
}
- printWriter.close();
-
return file.getPath();
}
private String generateTestDataFileForPartitionInput() throws Exception {
- final File file = getTempFile();
-
- PrintWriter printWriter = new PrintWriter(file);
-
- String partValues[] = {"1", "2", "null"};
-
- for(int c = 0; c < partValues.length; c++) {
- for(int d = 0; d < partValues.length; d++) {
- for(int e = 0; e < partValues.length; e++) {
- for (int i = 1; i <= 5; i++) {
- Date date = new Date(System.currentTimeMillis());
- Timestamp ts = new Timestamp(System.currentTimeMillis());
- printWriter.printf("%s,%s,%s,%s,%s",
- date.toString(), ts.toString(), partValues[c], partValues[d], partValues[e]);
- printWriter.println();
+ File file = getTempFile();
+ try (PrintWriter printWriter = new PrintWriter(file)) {
+ String partValues[] = {"1", "2", "null"};
+ for (String partValue : partValues) {
+ for (String partValue1 : partValues) {
+ for (String partValue2 : partValues) {
+ for (int i = 1; i <= 5; i++) {
+ Date date = new Date(System.currentTimeMillis());
+ Timestamp ts = new Timestamp(System.currentTimeMillis());
+ printWriter.printf("%s,%s,%s,%s,%s", date.toString(), ts.toString(), partValue, partValue1, partValue2);
+ printWriter.println();
+ }
}
}
}
}
- printWriter.close();
-
return file.getPath();
}
private String generateAllTypesDataFile() throws Exception {
File file = getTempFile();
- PrintWriter printWriter = new PrintWriter(file);
- printWriter.println("YmluYXJ5ZmllbGQ=,false,34,65.99,2347.923,2758725827.9999,29375892739852.7689," +
- "89853749534593985.7834783,8.345,4.67,123456,234235,3455,stringfield,varcharfield," +
- "2013-07-05 17:01:00,2013-07-05,charfield");
- printWriter.println(",,,,,,,,,,,,,,,,");
- printWriter.close();
+ try (PrintWriter printWriter = new PrintWriter(file)) {
+ printWriter.println("YmluYXJ5ZmllbGQ=,false,34,65.99,2347.923,2758725827.9999,29375892739852.7689,"+
+ "89853749534593985.7834783,8.345,4.67,123456,234235,3455,stringfield,varcharfield,"+
+ "2013-07-05 17:01:00,2013-07-05,charfield");
+ printWriter.println(",,,,,,,,,,,,,,,,");
+ }
return file.getPath();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 49f149b..4c840a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -410,6 +410,9 @@ public final class ExecConstants {
public static final OptionValidator HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER_VALIDATOR =
new BooleanValidator(HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER);
+ public static final String HIVE_CONF_PROPERTIES = "store.hive.conf.properties";
+ public static final OptionValidator HIVE_CONF_PROPERTIES_VALIDATOR = new StringValidator(HIVE_CONF_PROPERTIES);
+
public static final String SLICE_TARGET = "planner.slice_target";
public static final long SLICE_TARGET_DEFAULT = 100000l;
public static final PositiveLongValidator SLICE_TARGET_OPTION = new PositiveLongValidator(SLICE_TARGET, Long.MAX_VALUE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 36e74a0..e7518b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.opt;
-import com.google.common.collect.Lists;
-
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -28,9 +26,7 @@ import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.logical.data.Filter;
import org.apache.drill.common.logical.data.GroupingAggregate;
import org.apache.drill.common.logical.data.Join;
-import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.common.logical.data.LogicalOperator;
-import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.logical.data.Order;
import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.common.logical.data.Project;
@@ -53,6 +49,7 @@ import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.config.SelectionVectorRemover;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.UnnestPOP;
import org.apache.drill.exec.physical.config.WindowPOP;
import org.apache.drill.exec.rpc.UserClientConnection;
import org.apache.drill.exec.server.options.OptionManager;
@@ -64,6 +61,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.stream.Collectors;
public class BasicOptimizer extends Optimizer {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicOptimizer.class);
@@ -99,8 +97,7 @@ public class BasicOptimizer extends Optimizer {
.version(logicalProperties.version)
.generator(logicalProperties.generator)
.options(new JSONOptions(context.getOptions().getOptionList())).build();
- final PhysicalPlan p = new PhysicalPlan(props, physOps);
- return p;
+ return new PhysicalPlan(props, physOps);
}
public static class BasicOptimizationContext implements OptimizationContext {
@@ -128,30 +125,28 @@ public class BasicOptimizer extends Optimizer {
*/
private final LogicalPlan logicalPlan;
- public LogicalConverter(final LogicalPlan logicalPlan) {
+ LogicalConverter(final LogicalPlan logicalPlan) {
this.logicalPlan = logicalPlan;
}
@Override
public PhysicalOperator visitGroupingAggregate(GroupingAggregate groupBy, Object value) throws OptimizerException {
- final List<Ordering> orderDefs = Lists.newArrayList();
PhysicalOperator input = groupBy.getInput().accept(this, value);
if (groupBy.getKeys().size() > 0) {
- for(NamedExpression e : groupBy.getKeys()) {
- orderDefs.add(new Ordering(Direction.ASCENDING, e.getExpr(), NullDirection.FIRST));
- }
+ List<Ordering> orderDefs = groupBy.getKeys().stream()
+ .map(e -> new Ordering(Direction.ASCENDING, e.getExpr(), NullDirection.FIRST))
+ .collect(Collectors.toList());
input = new Sort(input, orderDefs, false);
}
- final StreamingAggregate sa = new StreamingAggregate(input, groupBy.getKeys(), groupBy.getExprs(), 1.0f);
- return sa;
+ return new StreamingAggregate(input, groupBy.getKeys(), groupBy.getExprs(), 1.0f);
}
@Override
public PhysicalOperator visitWindow(final Window window, final Object value) throws OptimizerException {
PhysicalOperator input = window.getInput().accept(this, value);
- final List<Ordering> ods = Lists.newArrayList();
+ final List<Ordering> ods = new ArrayList<>();
input = new Sort(input, ods, false);
@@ -162,11 +157,7 @@ public class BasicOptimizer extends Optimizer {
@Override
public PhysicalOperator visitOrder(final Order order, final Object value) throws OptimizerException {
final PhysicalOperator input = order.getInput().accept(this, value);
- final List<Ordering> ods = Lists.newArrayList();
- for (Ordering o : order.getOrderings()){
- ods.add(o);
- }
-
+ final List<Ordering> ods = new ArrayList<>(order.getOrderings());
return new SelectionVectorRemover(new Sort(input, ods, false));
}
@@ -180,18 +171,20 @@ public class BasicOptimizer extends Optimizer {
@Override
public PhysicalOperator visitJoin(final Join join, final Object value) throws OptimizerException {
PhysicalOperator leftOp = join.getLeft().accept(this, value);
- final List<Ordering> leftOrderDefs = Lists.newArrayList();
- for(JoinCondition jc : join.getConditions()){
- leftOrderDefs.add(new Ordering(Direction.ASCENDING, jc.getLeft()));
- }
+
+ List<Ordering> leftOrderDefs = join.getConditions().stream()
+ .map(jc -> new Ordering(Direction.ASCENDING, jc.getLeft()))
+ .collect(Collectors.toList());
+
leftOp = new Sort(leftOp, leftOrderDefs, false);
leftOp = new SelectionVectorRemover(leftOp);
PhysicalOperator rightOp = join.getRight().accept(this, value);
- final List<Ordering> rightOrderDefs = Lists.newArrayList();
- for(JoinCondition jc : join.getConditions()){
- rightOrderDefs.add(new Ordering(Direction.ASCENDING, jc.getRight()));
- }
+
+ List<Ordering> rightOrderDefs = join.getConditions().stream()
+ .map(jc -> new Ordering(Direction.ASCENDING, jc.getRight()))
+ .collect(Collectors.toList());
+
rightOp = new Sort(rightOp, rightOrderDefs, false);
rightOp = new SelectionVectorRemover(rightOp);
@@ -210,7 +203,7 @@ public class BasicOptimizer extends Optimizer {
try {
final StoragePlugin storagePlugin = queryContext.getStorage().getPlugin(config);
final String user = userSession.getSession().getCredentials().getUserName();
- return storagePlugin.getPhysicalScan(user, scan.getSelection());
+ return storagePlugin.getPhysicalScan(user, scan.getSelection(), userSession.getSession().getOptions());
} catch (IOException | ExecutionSetupException e) {
throw new OptimizerException("Failure while attempting to retrieve storage engine.", e);
}
@@ -241,8 +234,8 @@ public class BasicOptimizer extends Optimizer {
}
@Override
- public PhysicalOperator visitUnnest(final Unnest unnest, final Object obj) throws OptimizerException {
- return new org.apache.drill.exec.physical.config.UnnestPOP(null, unnest.getColumn());
+ public PhysicalOperator visitUnnest(final Unnest unnest, final Object obj) {
+ return new UnnestPOP(null, unnest.getColumn());
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
index 4ee7671..53036f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
@@ -32,6 +32,7 @@ import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.SchemalessScan;
import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.server.options.SessionOptionManager;
import org.apache.drill.exec.store.StoragePlugin;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.util.ImpersonationUtil;
@@ -45,6 +46,7 @@ public abstract class DrillTable implements Table {
private final StoragePlugin plugin;
private final String userName;
private GroupScan scan;
+ private SessionOptionManager options;
/**
* Creates a DrillTable instance for a @{code TableType#Table} table.
@@ -85,12 +87,16 @@ public abstract class DrillTable implements Table {
this(storageEngineName, plugin, ImpersonationUtil.getProcessUserName(), selection);
}
+ public void setOptions(SessionOptionManager options) {
+ this.options = options;
+ }
+
public GroupScan getGroupScan() throws IOException{
if (scan == null) {
if (selection instanceof FileSelection && ((FileSelection) selection).isEmptyDirectory()) {
this.scan = new SchemalessScan(userName, ((FileSelection) selection).getSelectionRoot());
} else {
- this.scan = plugin.getPhysicalScan(userName, new JSONOptions(selection));
+ this.scan = plugin.getPhysicalScan(userName, new JSONOptions(selection), options);
}
}
return scan;
@@ -138,7 +144,8 @@ public abstract class DrillTable implements Table {
return true;
}
- @Override public boolean isRolledUp(String column) {
+ @Override
+ public boolean isRolledUp(String column) {
return false;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
index 3f65ad2..d6b0951 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
@@ -78,6 +78,7 @@ import org.apache.drill.exec.ops.UdfUtilities;
import org.apache.drill.exec.planner.cost.DrillCostBase;
import org.apache.drill.exec.planner.logical.DrillConstExecutor;
import org.apache.drill.exec.planner.logical.DrillRelFactories;
+import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.physical.DrillDistributionTraitDef;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.rpc.user.UserSession;
@@ -113,7 +114,6 @@ public class SqlConverter {
private final DrillConfig drillConfig;
private RelOptCluster cluster;
- private String sql;
private VolcanoPlanner planner;
private boolean useRootSchema = false;
@@ -187,12 +187,10 @@ public class SqlConverter {
public SqlNode validate(final SqlNode parsedNode) {
try {
- SqlNode validatedNode = validator.validate(parsedNode);
- return validatedNode;
+ return validator.validate(parsedNode);
} catch (RuntimeException e) {
UserException.Builder builder = UserException
- .validationError(e)
- .addContext("SQL Query", sql);
+ .validationError(e);
if (isInnerQuery) {
builder.message("Failure validating a view your query is dependent upon.");
}
@@ -240,7 +238,7 @@ public class SqlConverter {
private class DrillValidator extends SqlValidatorImpl {
- protected DrillValidator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader,
+ DrillValidator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader,
RelDataTypeFactory typeFactory, SqlConformance conformance) {
super(opTab, catalogReader, typeFactory, conformance);
}
@@ -382,15 +380,11 @@ public class SqlConverter {
//To avoid unexpected column errors set a value of top to false
final RelRoot rel = sqlToRelConverter.convertQuery(validatedNode, false, false);
- final RelRoot rel2 = rel.withRel(sqlToRelConverter.flattenTypes(rel.rel, true));
- return rel2;
+ return rel.withRel(sqlToRelConverter.flattenTypes(rel.rel, true));
}
private class Expander implements RelOptTable.ViewExpander {
- public Expander() {
- }
-
@Override
public RelRoot expandView(RelDataType rowType, String queryString, List<String> schemaPath, List<String> viewPath) {
final DrillCalciteCatalogReader catalogReader = new DrillCalciteCatalogReader(
@@ -550,7 +544,7 @@ public class SqlConverter {
* also preserving nullability.
*
* <p>Tries to expand the cast, and therefore the result may be something
- * other than a {@link RexCall} to the CAST operator, such as a
+ * other than a {@link org.apache.calcite.rex.RexCall} to the CAST operator, such as a
* {@link RexLiteral} if {@code matchNullability} is false.
*
* @param type Type to cast to
@@ -611,7 +605,7 @@ public class SqlConverter {
/**
* Disallow temporary tables presence in sql statement (ex: in view definitions)
*/
- public void disallowTemporaryTables() {
+ void disallowTemporaryTables() {
this.allowTemporaryTables = false;
}
@@ -647,13 +641,19 @@ public class SqlConverter {
}
}
- return super.getTable(names);
+ Prepare.PreparingTable table = super.getTable(names);
+ DrillTable unwrap;
+ // add session options if found table is Drill table
+ if (table != null && (unwrap = table.unwrap(DrillTable.class)) != null) {
+ unwrap.setOptions(session.getOptions());
+ }
+ return table;
}
@Override
public List<List<String>> getSchemaPaths() {
if (useRootSchema) {
- return ImmutableList.<List<String>>of(ImmutableList.<String>of());
+ return ImmutableList.of(ImmutableList.of());
}
return super.getSchemaPaths();
}
@@ -662,8 +662,8 @@ public class SqlConverter {
* check if the schema provided is a valid schema:
* <li>schema is not indicated (only one element in the names list)<li/>
*
- * @param names list of schema and table names, table name is always the last element
- * @return throws a userexception if the schema is not valid.
+ * @param names list of schema and table names, table name is always the last element
+ * @throws UserException if the schema is not valid.
*/
private void isValidSchema(final List<String> names) throws UserException {
SchemaPlus defaultSchema = session.getDefaultSchema(this.rootSchema);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index a9c4742..a16bb4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -17,11 +17,14 @@
*/
package org.apache.drill.exec.server.options;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import org.apache.commons.collections.IteratorUtils;
import org.apache.drill.common.config.DrillConfig;
@@ -40,15 +43,13 @@ import org.apache.drill.exec.util.AssertionUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * {@link OptionManager} that holds options within {@link org.apache.drill.exec.server.DrillbitContext}.
- * Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and
- * persist between restarts.
- */
/**
+ * <p> {@link OptionManager} that holds options within {@link org.apache.drill.exec.server.DrillbitContext}.
+ * Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and
+ * persist between restarts.
+ * </p>
+ *
* <p> All the system options are externalized into conf file. While adding a new system option
* a validator should be added and the default value for the option should be set in
* the conf files(example : drill-module.conf) under the namespace drill.exec.options.
@@ -173,6 +174,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS_VALIDATOR),
new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER_VALIDATOR),
new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER_VALIDATOR),
+ new OptionDefinition(ExecConstants.HIVE_CONF_PROPERTIES_VALIDATOR),
new OptionDefinition(ExecConstants.SLICE_TARGET_OPTION),
new OptionDefinition(ExecConstants.AFFINITY_FACTOR),
new OptionDefinition(ExecConstants.MAX_WIDTH_GLOBAL),
@@ -237,11 +239,13 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
};
- final CaseInsensitiveMap<OptionDefinition> map = CaseInsensitiveMap.newHashMap();
+ CaseInsensitiveMap<OptionDefinition> map = Arrays.stream(definitions)
+ .collect(Collectors.toMap(
+ d -> d.getValidator().getOptionName(),
+ Function.identity(),
+ (o, n) -> n,
+ CaseInsensitiveMap::newHashMap));
- for (final OptionDefinition definition: definitions) {
- map.put(definition.getValidator().getOptionName(), definition);
- }
if (AssertionUtil.isAssertionsEnabled()) {
map.put(ExecConstants.DRILLBIT_CONTROL_INJECTIONS, new OptionDefinition(ExecConstants.DRILLBIT_CONTROLS_VALIDATOR));
@@ -295,7 +299,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
* Initializes this option manager.
*
* @return this option manager
- * @throws Exception
+ * @throws Exception if unable to initialize option manager
*/
public SystemOptionManager init() throws Exception {
options = provider.getOrCreateStore(config);
@@ -395,16 +399,13 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
@Override
public void deleteAllLocalOptions() {
- final Set<String> names = Sets.newHashSet();
- for (final Map.Entry<String, PersistedOptionValue> entry : Lists.newArrayList(options.getAll())) {
- names.add(entry.getKey());
- }
- for (final String name : names) {
- options.delete(name); // should be lowercase
- }
+ Iterable<Map.Entry<String, PersistedOptionValue>> allOptions = () -> options.getAll();
+ StreamSupport.stream(allOptions.spliterator(), false)
+ .map(Entry::getKey)
+ .forEach(name -> options.delete(name)); // should be lowercase
}
- public static CaseInsensitiveMap<OptionValue> populateDefaultValues(Map<String, OptionDefinition> definitions, DrillConfig bootConfig) {
+ private CaseInsensitiveMap<OptionValue> populateDefaultValues(Map<String, OptionDefinition> definitions, DrillConfig bootConfig) {
// populate the options from the config
final Map<String, OptionValue> defaults = new HashMap<>();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index de2b2c3..d37a0a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.planner.PlannerPhase;
import com.google.common.collect.ImmutableSet;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SessionOptionManager;
/** Abstract class for StorePlugin implementations.
* See StoragePlugin for description of the interface intent and its methods.
@@ -102,12 +103,23 @@ public abstract class AbstractStoragePlugin implements StoragePlugin {
}
}
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options) throws IOException {
+ return getPhysicalScan(userName, selection);
+ }
+
@Override
public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS);
}
@Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options) throws IOException {
+ return getPhysicalScan(userName, selection, columns);
+ }
+
+ @Override
public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException {
throw new UnsupportedOperationException();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
index 50f2731..2617065 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
@@ -27,6 +27,7 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.options.SessionOptionManager;
/** Interface for all implementations of the storage plugins. Different implementations of the storage
* formats will implement methods that indicate if Drill can write or read its tables from that format,
@@ -36,18 +37,18 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable {
/** Indicates if Drill can read the table from this format.
*/
- public boolean supportsRead();
+ boolean supportsRead();
/** Indicates if Drill can write a table to this format (e.g. as JSON, csv, etc.).
*/
- public boolean supportsWrite();
+ boolean supportsWrite();
/** An implementation of this method will return one or more specialized rules that Drill query
* optimizer can leverage in <i>physical</i> space. Otherwise, it should return an empty set.
* @return an empty set or a set of plugin specific physical optimizer rules.
*/
@Deprecated
- public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext);
+ Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext);
/**
* Get the physical scan operator for the particular GroupScan (read) node.
@@ -55,9 +56,18 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable {
* @param userName User whom to impersonate when when reading the contents as part of Scan.
* @param selection The configured storage engine specific selection.
* @return The physical scan operator for the particular GroupScan (read) node.
- * @throws IOException
*/
- public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException;
+ AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException;
+
+ /**
+ * Get the physical scan operator for the particular GroupScan (read) node.
+ *
+ * @param userName User whom to impersonate when when reading the contents as part of Scan.
+ * @param selection The configured storage engine specific selection.
+ * @param options (optional) session options
+ * @return The physical scan operator for the particular GroupScan (read) node.
+ */
+ AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options) throws IOException;
/**
* Get the physical scan operator for the particular GroupScan (read) node.
@@ -66,18 +76,29 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable {
* @param selection The configured storage engine specific selection.
* @param columns (optional) The list of column names to scan from the data source.
* @return The physical scan operator for the particular GroupScan (read) node.
- * @throws IOException
*/
- public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
- throws IOException;
+ AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException;
- /** Method returns a Jackson serializable object that extends a StoragePluginConfig
- * @return an extension of StoragePluginConfig
+ /**
+ * Get the physical scan operator for the particular GroupScan (read) node.
+ *
+ * @param userName User whom to impersonate when when reading the contents as part of Scan.
+ * @param selection The configured storage engine specific selection.
+ * @param columns (optional) The list of column names to scan from the data source.
+ * @param options (optional) session options
+ * @return The physical scan operator for the particular GroupScan (read) node.
+ */
+ AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options) throws IOException;
+
+ /**
+ * Method returns a Jackson serializable object that extends a StoragePluginConfig.
+ *
+ * @return an extension of StoragePluginConfig
*/
- public StoragePluginConfig getConfig();
+ StoragePluginConfig getConfig();
/**
* Initialize the storage plugin. The storage plugin will not be used until this method is called.
*/
- public void start() throws IOException;
+ void start() throws IOException;
}
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 4b1e9dd..b0cc209 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -555,6 +555,10 @@ drill.exec.options: {
store.hive.optimize_scan_with_native_readers: false,
store.hive.parquet.optimize_scan_with_native_reader: false,
store.hive.maprdb_json.optimize_scan_with_native_reader: false,
+ # Properties values should NOT be set in double-quotes or any other quotes.
+ # Property name and value should be separated by =.
+ # Properties should be separated by new line (\n).
+ store.hive.conf.properties: "",
store.json.all_text_mode: false,
store.json.writer.allow_nan_inf: true,
store.json.reader.allow_nan_inf: true,